Skip to content
Snippets Groups Projects
Commit dedbceec authored by cody koeninger's avatar cody koeninger Committed by Tathagata Das
Browse files

[SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new Kafka 0.10 Consumer API

## What changes were proposed in this pull request?

New Kafka consumer api for the released 0.10 version of Kafka

## How was this patch tested?

Unit tests, manual tests

Author: cody koeninger <cody@koeninger.org>

Closes #11863 from koeninger/kafka-0.9.
parent bde1d6a6
No related branches found
No related tags found
No related merge requests found
Showing
with 3351 additions and 0 deletions
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10-assembly_2.11</artifactId>
<packaging>jar</packaging>
<name>Spark Integration for Kafka 0.10 Assembly</name>
<url>http://spark.apache.org/</url>
<properties>
<sbt.project.name>streaming-kafka-0-10-assembly</sbt.project.name>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<!--
Demote already included in the Spark assembly.
-->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<classifier>${avro.mapred.classifier}</classifier>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
<resource>log4j.properties</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<properties>
<sbt.project.name>streaming-kafka-0-10</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Integration for Kafka 0.10</name>
<url>http://spark.apache.org/</url>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>0.10.0.0</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
<version>3.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.kafka010
import java.{ util => ju }
import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, KafkaConsumer }
import org.apache.kafka.common.{ KafkaException, TopicPartition }
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
/**
* Consumer of single topicpartition, intended for cached reuse.
* Underlying consumer is not threadsafe, so neither is this,
* but processing the same topicpartition and group id in multiple threads is usually bad anyway.
*/
private[kafka010]
class CachedKafkaConsumer[K, V] private(
val groupId: String,
val topic: String,
val partition: Int,
val kafkaParams: ju.Map[String, Object]) extends Logging {
assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
"groupId used for cache key must match the groupId in kafkaParams")
val topicPartition = new TopicPartition(topic, partition)
protected val consumer = {
val c = new KafkaConsumer[K, V](kafkaParams)
val tps = new ju.ArrayList[TopicPartition]()
tps.add(topicPartition)
c.assign(tps)
c
}
// TODO if the buffer was kept around as a random-access structure,
// could possibly optimize re-calculating of an RDD in the same batch
protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator
protected var nextOffset = -2L
def close(): Unit = consumer.close()
/**
* Get the record for the given offset, waiting up to timeout ms if IO is necessary.
* Sequential forward access will use buffers, but random access will be horribly inefficient.
*/
def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset")
if (offset != nextOffset) {
logInfo(s"Initial fetch for $groupId $topic $partition $offset")
seek(offset)
poll(timeout)
}
if (!buffer.hasNext()) { poll(timeout) }
assert(buffer.hasNext(),
s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
var record = buffer.next()
if (record.offset != offset) {
logInfo(s"Buffer miss for $groupId $topic $partition $offset")
seek(offset)
poll(timeout)
assert(buffer.hasNext(),
s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
record = buffer.next()
assert(record.offset == offset,
s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset")
}
nextOffset = offset + 1
record
}
private def seek(offset: Long): Unit = {
logDebug(s"Seeking to $topicPartition $offset")
consumer.seek(topicPartition, offset)
}
private def poll(timeout: Long): Unit = {
val p = consumer.poll(timeout)
val r = p.records(topicPartition)
logDebug(s"Polled ${p.partitions()} ${r.size}")
buffer = r.iterator
}
}
private[kafka010]
object CachedKafkaConsumer extends Logging {
private case class CacheKey(groupId: String, topic: String, partition: Int)
// Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap
private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null
/** Must be called before get, once per JVM, to configure the cache. Further calls are ignored */
def init(
initialCapacity: Int,
maxCapacity: Int,
loadFactor: Float): Unit = CachedKafkaConsumer.synchronized {
if (null == cache) {
logInfo(s"Initializing cache $initialCapacity $maxCapacity $loadFactor")
cache = new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]](
initialCapacity, loadFactor, true) {
override def removeEldestEntry(
entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer[_, _]]): Boolean = {
if (this.size > maxCapacity) {
try {
entry.getValue.consumer.close()
} catch {
case x: KafkaException =>
logError("Error closing oldest Kafka consumer", x)
}
true
} else {
false
}
}
}
}
}
/**
* Get a cached consumer for groupId, assigned to topic and partition.
* If matching consumer doesn't already exist, will be created using kafkaParams.
*/
def get[K, V](
groupId: String,
topic: String,
partition: Int,
kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer[K, V] =
CachedKafkaConsumer.synchronized {
val k = CacheKey(groupId, topic, partition)
val v = cache.get(k)
if (null == v) {
logInfo(s"Cache miss for $k")
logDebug(cache.keySet.toString)
val c = new CachedKafkaConsumer[K, V](groupId, topic, partition, kafkaParams)
cache.put(k, c)
c
} else {
// any given topicpartition should have a consistent key and value type
v.asInstanceOf[CachedKafkaConsumer[K, V]]
}
}
/**
* Get a fresh new instance, unassociated with the global cache.
* Caller is responsible for closing
*/
def getUncached[K, V](
groupId: String,
topic: String,
partition: Int,
kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer[K, V] =
new CachedKafkaConsumer[K, V](groupId, topic, partition, kafkaParams)
/** remove consumer for given groupId, topic, and partition, if it exists */
def remove(groupId: String, topic: String, partition: Int): Unit = {
val k = CacheKey(groupId, topic, partition)
logInfo(s"Removing $k from cache")
val v = CachedKafkaConsumer.synchronized {
cache.remove(k)
}
if (null != v) {
v.close()
logInfo(s"Removed $k from cache")
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.kafka010
import java.{ util => ju }
import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
import org.apache.spark.annotation.Experimental
/**
* :: Experimental ::
* Choice of how to create and configure underlying Kafka Consumers on driver and executors.
* Kafka 0.10 consumers can require additional, sometimes complex, setup after object
* instantiation. This interface encapsulates that process, and allows it to be checkpointed.
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@Experimental
trait ConsumerStrategy[K, V] {
/**
* Kafka <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* configuration parameters</a> to be used on executors. Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
def executorKafkaParams: ju.Map[String, Object]
/**
* Must return a fully configured Kafka Consumer, including subscribed or assigned topics.
* This consumer will be used on the driver to query for offsets only, not messages.
* @param currentOffsets A map from TopicPartition to offset, indicating how far the driver
* has successfully read. Will be empty on initial start, possibly non-empty on restart from
* checkpoint.
*/
def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V]
}
/**
* :: Experimental ::
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsets: offsets to begin at on initial startup. If no offset is given for a
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@Experimental
case class Subscribe[K, V] private(
topics: ju.Collection[java.lang.String],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, Long]
) extends ConsumerStrategy[K, V] {
def executorKafkaParams: ju.Map[String, Object] = kafkaParams
def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = {
val consumer = new KafkaConsumer[K, V](kafkaParams)
consumer.subscribe(topics)
if (currentOffsets.isEmpty) {
offsets.asScala.foreach { case (topicPartition, offset) =>
consumer.seek(topicPartition, offset)
}
}
consumer
}
}
/**
* :: Experimental ::
* Companion object for creating [[Subscribe]] strategy
*/
@Experimental
object Subscribe {
/**
* :: Experimental ::
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsets: offsets to begin at on initial startup. If no offset is given for a
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@Experimental
def apply[K, V](
topics: Iterable[java.lang.String],
kafkaParams: collection.Map[String, Object],
offsets: collection.Map[TopicPartition, Long]): Subscribe[K, V] = {
Subscribe[K, V](
new ju.ArrayList(topics.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, Long](offsets.asJava))
}
/**
* :: Experimental ::
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def apply[K, V](
topics: Iterable[java.lang.String],
kafkaParams: collection.Map[String, Object]): Subscribe[K, V] = {
Subscribe[K, V](
new ju.ArrayList(topics.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
ju.Collections.emptyMap[TopicPartition, Long]())
}
/**
* :: Experimental ::
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsets: offsets to begin at on initial startup. If no offset is given for a
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@Experimental
def create[K, V](
topics: ju.Collection[java.lang.String],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, Long]): Subscribe[K, V] = {
Subscribe[K, V](topics, kafkaParams, offsets)
}
/**
* :: Experimental ::
* Subscribe to a collection of topics.
* @param topics collection of topics to subscribe
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def create[K, V](
topics: ju.Collection[java.lang.String],
kafkaParams: ju.Map[String, Object]): Subscribe[K, V] = {
Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]())
}
}
/**
* :: Experimental ::
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsets: offsets to begin at on initial startup. If no offset is given for a
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@Experimental
case class Assign[K, V] private(
topicPartitions: ju.Collection[TopicPartition],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, Long]
) extends ConsumerStrategy[K, V] {
def executorKafkaParams: ju.Map[String, Object] = kafkaParams
def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = {
val consumer = new KafkaConsumer[K, V](kafkaParams)
consumer.assign(topicPartitions)
if (currentOffsets.isEmpty) {
offsets.asScala.foreach { case (topicPartition, offset) =>
consumer.seek(topicPartition, offset)
}
}
consumer
}
}
/**
* :: Experimental ::
* Companion object for creating [[Assign]] strategy
*/
@Experimental
object Assign {
/**
* :: Experimental ::
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsets: offsets to begin at on initial startup. If no offset is given for a
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@Experimental
def apply[K, V](
topicPartitions: Iterable[TopicPartition],
kafkaParams: collection.Map[String, Object],
offsets: collection.Map[TopicPartition, Long]): Assign[K, V] = {
Assign[K, V](
new ju.ArrayList(topicPartitions.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, Long](offsets.asJava))
}
/**
* :: Experimental ::
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def apply[K, V](
topicPartitions: Iterable[TopicPartition],
kafkaParams: collection.Map[String, Object]): Assign[K, V] = {
Assign[K, V](
new ju.ArrayList(topicPartitions.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
ju.Collections.emptyMap[TopicPartition, Long]())
}
/**
* :: Experimental ::
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsets: offsets to begin at on initial startup. If no offset is given for a
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@Experimental
def create[K, V](
topicPartitions: ju.Collection[TopicPartition],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, Long]): Assign[K, V] = {
Assign[K, V](topicPartitions, kafkaParams, offsets)
}
/**
* :: Experimental ::
* Assign a fixed collection of TopicPartitions
* @param topicPartitions collection of TopicPartitions to assign
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def create[K, V](
topicPartitions: ju.Collection[TopicPartition],
kafkaParams: ju.Map[String, Object]): Assign[K, V] = {
Assign[K, V](topicPartitions, kafkaParams, ju.Collections.emptyMap[TopicPartition, Long]())
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.kafka010
import java.{ util => ju }
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
import org.apache.spark.streaming.scheduler.rate.RateEstimator
/**
* A DStream where
* each given Kafka topic/partition corresponds to an RDD partition.
* The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
* of messages
* per second that each '''partition''' will accept.
* @param locationStrategy In most cases, pass in [[PreferConsistent]],
* see [[LocationStrategy]] for more details.
* @param executorKafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a>.
* Requires "bootstrap.servers" to be set with Kafka broker(s),
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
* @param consumerStrategy In most cases, pass in [[Subscribe]],
* see [[ConsumerStrategy]] for more details
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
private[spark] class DirectKafkaInputDStream[K, V](
_ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V]
) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets {
val executorKafkaParams = {
val ekp = new ju.HashMap[String, Object](consumerStrategy.executorKafkaParams)
KafkaUtils.fixKafkaParams(ekp)
ekp
}
protected var currentOffsets = Map[TopicPartition, Long]()
@transient private var kc: Consumer[K, V] = null
def consumer(): Consumer[K, V] = this.synchronized {
if (null == kc) {
kc = consumerStrategy.onStart(currentOffsets)
}
kc
}
override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = {
logError("Kafka ConsumerRecord is not serializable. " +
"Use .map to extract fields before calling .persist or .window")
super.persist(newLevel)
}
protected def getBrokers = {
val c = consumer
val result = new ju.HashMap[TopicPartition, String]()
val hosts = new ju.HashMap[TopicPartition, String]()
val assignments = c.assignment().iterator()
while (assignments.hasNext()) {
val tp: TopicPartition = assignments.next()
if (null == hosts.get(tp)) {
val infos = c.partitionsFor(tp.topic).iterator()
while (infos.hasNext()) {
val i = infos.next()
hosts.put(new TopicPartition(i.topic(), i.partition()), i.leader.host())
}
}
result.put(tp, hosts.get(tp))
}
result
}
protected def getPreferredHosts: ju.Map[TopicPartition, String] = {
locationStrategy match {
case PreferBrokers => getBrokers
case PreferConsistent => ju.Collections.emptyMap[TopicPartition, String]()
case PreferFixed(hostMap) => hostMap
}
}
// Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]")
private[streaming] override def name: String = s"Kafka 0.10 direct stream [$id]"
protected[streaming] override val checkpointData =
new DirectKafkaInputDStreamCheckpointData
/**
* Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
*/
override protected[streaming] val rateController: Option[RateController] = {
if (RateController.isBackPressureEnabled(ssc.conf)) {
Some(new DirectKafkaRateController(id,
RateEstimator.create(ssc.conf, context.graph.batchDuration)))
} else {
None
}
}
private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRatePerPartition", 0)
protected[streaming] def maxMessagesPerPartition(
offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = {
val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
// calculate a per-partition rate limit based on current lag
val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match {
case Some(rate) =>
val lagPerPartition = offsets.map { case (tp, offset) =>
tp -> Math.max(offset - currentOffsets(tp), 0)
}
val totalLag = lagPerPartition.values.sum
lagPerPartition.map { case (tp, lag) =>
val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
tp -> (if (maxRateLimitPerPartition > 0) {
Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate)
}
case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition }
}
if (effectiveRateLimitPerPartition.values.sum > 0) {
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
Some(effectiveRateLimitPerPartition.map {
case (tp, limit) => tp -> (secsPerBatch * limit).toLong
})
} else {
None
}
}
/**
* Returns the latest (highest) available offsets, taking new partitions into account.
*/
protected def latestOffsets(): Map[TopicPartition, Long] = {
val c = consumer
c.poll(0)
val parts = c.assignment().asScala
// make sure new partitions are reflected in currentOffsets
val newPartitions = parts.diff(currentOffsets.keySet)
// position for new partitions determined by auto.offset.reset if no commit
currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap
// don't want to consume messages, so pause
c.pause(newPartitions.asJava)
// find latest available offsets
c.seekToEnd(currentOffsets.keySet.asJava)
parts.map(tp => tp -> c.position(tp)).toMap
}
// limits the maximum number of messages per partition
protected def clamp(
offsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
maxMessagesPerPartition(offsets).map { mmp =>
mmp.map { case (tp, messages) =>
val uo = offsets(tp)
tp -> Math.min(currentOffsets(tp) + messages, uo)
}
}.getOrElse(offsets)
}
override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
val untilOffsets = clamp(latestOffsets())
val offsetRanges = untilOffsets.map { case (tp, uo) =>
val fo = currentOffsets(tp)
OffsetRange(tp.topic, tp.partition, fo, uo)
}
val rdd = new KafkaRDD[K, V](
context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, true)
// Report the record number and metadata of this batch interval to InputInfoTracker.
val description = offsetRanges.filter { offsetRange =>
// Don't display empty ranges.
offsetRange.fromOffset != offsetRange.untilOffset
}.map { offsetRange =>
s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
}.mkString("\n")
// Copy offsetRanges to immutable.List to prevent from being modified by the user
val metadata = Map(
"offsets" -> offsetRanges.toList,
StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
val inputInfo = StreamInputInfo(id, rdd.count, metadata)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
currentOffsets = untilOffsets
commitAll()
Some(rdd)
}
override def start(): Unit = {
val c = consumer
c.poll(0)
if (currentOffsets.isEmpty) {
currentOffsets = c.assignment().asScala.map { tp =>
tp -> c.position(tp)
}.toMap
}
// don't actually want to consume any messages, so pause all partitions
c.pause(currentOffsets.keySet.asJava)
}
override def stop(): Unit = this.synchronized {
if (kc != null) {
kc.close()
}
}
protected val commitQueue = new ConcurrentLinkedQueue[OffsetRange]
protected val commitCallback = new AtomicReference[OffsetCommitCallback]
/**
* Queue up offset ranges for commit to Kafka at a future time. Threadsafe.
* @param offsetRanges The maximum untilOffset for a given partition will be used at commit.
*/
def commitAsync(offsetRanges: Array[OffsetRange]): Unit = {
commitAsync(offsetRanges, null)
}
/**
* Queue up offset ranges for commit to Kafka at a future time. Threadsafe.
* @param offsetRanges The maximum untilOffset for a given partition will be used at commit.
* @param callback Only the most recently provided callback will be used at commit.
*/
def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit = {
commitCallback.set(callback)
commitQueue.addAll(ju.Arrays.asList(offsetRanges: _*))
}
protected def commitAll(): Unit = {
val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]()
val it = commitQueue.iterator()
while (it.hasNext) {
val osr = it.next
val tp = osr.topicPartition
val x = m.get(tp)
val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) }
m.put(tp, new OffsetAndMetadata(offset))
}
if (!m.isEmpty) {
consumer.commitAsync(m, commitCallback.get)
}
}
private[streaming]
class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
}
override def update(time: Time): Unit = {
batchForTime.clear()
generatedRDDs.foreach { kv =>
val a = kv._2.asInstanceOf[KafkaRDD[K, V]].offsetRanges.map(_.toTuple).toArray
batchForTime += kv._1 -> a
}
}
override def cleanup(time: Time): Unit = { }
override def restore(): Unit = {
batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
generatedRDDs += t -> new KafkaRDD[K, V](
context.sparkContext,
executorKafkaParams,
b.map(OffsetRange(_)),
getPreferredHosts,
// during restore, it's possible same partition will be consumed from multiple
// threads, so dont use cache
false
)
}
}
}
/**
* A RateController to retrieve the rate from RateEstimator.
*/
private[streaming] class DirectKafkaRateController(id: Int, estimator: RateEstimator)
extends RateController(id, estimator) {
override def publish(rate: Long): Unit = ()
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.kafka010
import java.{ util => ju }
import scala.collection.mutable.ArrayBuffer
import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord }
import org.apache.kafka.common.TopicPartition
import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.storage.StorageLevel
/**
* A batch-oriented interface for consuming from Kafka.
* Starting and ending offsets are specified in advance,
* so that you can control exactly-once semantics.
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* configuration parameters</a>. Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
* @param preferredHosts map from TopicPartition to preferred host for processing that partition.
* In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
* Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers.
* @param useConsumerCache whether to use a consumer from a per-jvm cache
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
private[spark] class KafkaRDD[K, V](
sc: SparkContext,
val kafkaParams: ju.Map[String, Object],
val offsetRanges: Array[OffsetRange],
val preferredHosts: ju.Map[TopicPartition, String],
useConsumerCache: Boolean
) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges {
assert("none" ==
kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String],
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG +
" must be set to none for executor kafka params, else messages may not match offsetRange")
assert(false ==
kafkaParams.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).asInstanceOf[Boolean],
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG +
" must be set to false for executor kafka params, else offsets may commit before processing")
// TODO is it necessary to have separate configs for initial poll time vs ongoing poll time?
private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256)
private val cacheInitialCapacity =
conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
private val cacheMaxCapacity =
conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64)
private val cacheLoadFactor =
conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 0.75).toFloat
override def persist(newLevel: StorageLevel): this.type = {
logError("Kafka ConsumerRecord is not serializable. " +
"Use .map to extract fields before calling .persist or .window")
super.persist(newLevel)
}
override def getPartitions: Array[Partition] = {
offsetRanges.zipWithIndex.map { case (o, i) =>
new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset)
}.toArray
}
override def count(): Long = offsetRanges.map(_.count).sum
override def countApprox(
timeout: Long,
confidence: Double = 0.95
): PartialResult[BoundedDouble] = {
val c = count
new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
}
override def isEmpty(): Boolean = count == 0L
override def take(num: Int): Array[ConsumerRecord[K, V]] = {
val nonEmptyPartitions = this.partitions
.map(_.asInstanceOf[KafkaRDDPartition])
.filter(_.count > 0)
if (num < 1 || nonEmptyPartitions.isEmpty) {
return new Array[ConsumerRecord[K, V]](0)
}
// Determine in advance how many messages need to be taken from each partition
val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) =>
val remain = num - result.values.sum
if (remain > 0) {
val taken = Math.min(remain, part.count)
result + (part.index -> taken.toInt)
} else {
result
}
}
val buf = new ArrayBuffer[ConsumerRecord[K, V]]
val res = context.runJob(
this,
(tc: TaskContext, it: Iterator[ConsumerRecord[K, V]]) =>
it.take(parts(tc.partitionId)).toArray, parts.keys.toArray
)
res.foreach(buf ++= _)
buf.toArray
}
private def executors(): Array[ExecutorCacheTaskLocation] = {
val bm = sparkContext.env.blockManager
bm.master.getPeers(bm.blockManagerId).toArray
.map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
.sortWith(compareExecutors)
}
protected[kafka010] def compareExecutors(
a: ExecutorCacheTaskLocation,
b: ExecutorCacheTaskLocation): Boolean =
if (a.host == b.host) {
a.executorId > b.executorId
} else {
a.host > b.host
}
/**
* Non-negative modulus, from java 8 math
*/
private def floorMod(a: Int, b: Int): Int = ((a % b) + b) % b
override def getPreferredLocations(thePart: Partition): Seq[String] = {
// The intention is best-effort consistent executor for a given topicpartition,
// so that caching consumers can be effective.
// TODO what about hosts specified by ip vs name
val part = thePart.asInstanceOf[KafkaRDDPartition]
val allExecs = executors()
val tp = part.topicPartition
val prefHost = preferredHosts.get(tp)
val prefExecs = if (null == prefHost) allExecs else allExecs.filter(_.host == prefHost)
val execs = if (prefExecs.isEmpty) allExecs else prefExecs
if (execs.isEmpty) {
Seq()
} else {
// execs is sorted, tp.hashCode depends only on topic and partition, so consistent index
val index = this.floorMod(tp.hashCode, execs.length)
val chosen = execs(index)
Seq(chosen.toString)
}
}
private def errBeginAfterEnd(part: KafkaRDDPartition): String =
s"Beginning offset ${part.fromOffset} is after the ending offset ${part.untilOffset} " +
s"for topic ${part.topic} partition ${part.partition}. " +
"You either provided an invalid fromOffset, or the Kafka topic has been damaged"
override def compute(thePart: Partition, context: TaskContext): Iterator[ConsumerRecord[K, V]] = {
val part = thePart.asInstanceOf[KafkaRDDPartition]
assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
if (part.fromOffset == part.untilOffset) {
logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
s"skipping ${part.topic} ${part.partition}")
Iterator.empty
} else {
new KafkaRDDIterator(part, context)
}
}
/**
* An iterator that fetches messages directly from Kafka for the offsets in partition.
* Uses a cached consumer where possible to take advantage of prefetching
*/
private class KafkaRDDIterator(
part: KafkaRDDPartition,
context: TaskContext) extends Iterator[ConsumerRecord[K, V]] {
logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
s"offsets ${part.fromOffset} -> ${part.untilOffset}")
val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
context.addTaskCompletionListener{ context => closeIfNeeded() }
val consumer = if (useConsumerCache) {
CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor)
if (context.attemptNumber > 1) {
// just in case the prior attempt failures were cache related
CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
}
CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams)
} else {
CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams)
}
var requestOffset = part.fromOffset
def closeIfNeeded(): Unit = {
if (!useConsumerCache && consumer != null) {
consumer.close
}
}
override def hasNext(): Boolean = requestOffset < part.untilOffset
override def next(): ConsumerRecord[K, V] = {
assert(hasNext(), "Can't call getNext() once untilOffset has been reached")
val r = consumer.get(requestOffset, pollTimeout)
requestOffset += 1
r
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.kafka010
import org.apache.kafka.common.TopicPartition
import org.apache.spark.Partition
/**
* @param topic kafka topic name
* @param partition kafka partition id
* @param fromOffset inclusive starting offset
* @param untilOffset exclusive ending offset
*/
private[kafka010]
class KafkaRDDPartition(
val index: Int,
val topic: String,
val partition: Int,
val fromOffset: Long,
val untilOffset: Long
) extends Partition {
/** Number of messages this partition refers to */
def count(): Long = untilOffset - fromOffset
/** Kafka TopicPartition object, for convenience */
def topicPartition(): TopicPartition = new TopicPartition(topic, partition)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.kafka010
import java.io.File
import java.lang.{Integer => JInt}
import java.net.InetSocketAddress
import java.util.{Map => JMap, Properties}
import java.util.concurrent.TimeoutException
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.language.postfixOps
import scala.util.control.NonFatal
import kafka.admin.AdminUtils
import kafka.api.Request
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import kafka.serializer.StringEncoder
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.ZkUtils
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.streaming.Time
import org.apache.spark.util.Utils
/**
* This is a helper class for Kafka test suites. This has the functionality to set up
* and tear down local Kafka servers, and to push data using Kafka producers.
*
* The reason to put Kafka test utility class in src is to test Python related Kafka APIs.
*/
private[kafka010] class KafkaTestUtils extends Logging {
// Zookeeper related configurations
private val zkHost = "localhost"
private var zkPort: Int = 0
private val zkConnectionTimeout = 60000
private val zkSessionTimeout = 6000
private var zookeeper: EmbeddedZookeeper = _
private var zkUtils: ZkUtils = _
// Kafka broker related configurations
private val brokerHost = "localhost"
private var brokerPort = 9092
private var brokerConf: KafkaConfig = _
// Kafka broker server
private var server: KafkaServer = _
// Kafka producer
private var producer: Producer[String, String] = _
// Flag to test whether the system is correctly started
private var zkReady = false
private var brokerReady = false
def zkAddress: String = {
assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address")
s"$zkHost:$zkPort"
}
def brokerAddress: String = {
assert(brokerReady, "Kafka not setup yet or already torn down, cannot get broker address")
s"$brokerHost:$brokerPort"
}
def zookeeperClient: ZkUtils = {
assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client")
Option(zkUtils).getOrElse(
throw new IllegalStateException("Zookeeper client is not yet initialized"))
}
// Set up the Embedded Zookeeper server and get the proper Zookeeper port
private def setupEmbeddedZookeeper(): Unit = {
// Zookeeper server startup
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
// Get the actual zookeeper binding port
zkPort = zookeeper.actualPort
zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false)
zkReady = true
}
// Set up the Embedded Kafka server
private def setupEmbeddedKafkaServer(): Unit = {
assert(zkReady, "Zookeeper should be set up beforehand")
// Kafka broker startup
Utils.startServiceOnPort(brokerPort, port => {
brokerPort = port
brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
server = new KafkaServer(brokerConf)
server.startup()
(server, port)
}, new SparkConf(), "KafkaBroker")
brokerReady = true
}
/** setup the whole embedded servers, including Zookeeper and Kafka brokers */
def setup(): Unit = {
setupEmbeddedZookeeper()
setupEmbeddedKafkaServer()
}
/** Teardown the whole servers, including Kafka broker and Zookeeper */
def teardown(): Unit = {
brokerReady = false
zkReady = false
if (producer != null) {
producer.close()
producer = null
}
if (server != null) {
server.shutdown()
server = null
}
brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
if (zkUtils != null) {
zkUtils.close()
zkUtils = null
}
if (zookeeper != null) {
zookeeper.shutdown()
zookeeper = null
}
}
/** Create a Kafka topic and wait until it is propagated to the whole cluster */
def createTopic(topic: String, partitions: Int): Unit = {
AdminUtils.createTopic(zkUtils, topic, partitions, 1)
// wait until metadata is propagated
(0 until partitions).foreach { p =>
waitUntilMetadataIsPropagated(topic, p)
}
}
/** Create a Kafka topic and wait until it is propagated to the whole cluster */
def createTopic(topic: String): Unit = {
createTopic(topic, 1)
}
/** Java-friendly function for sending messages to the Kafka broker */
def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
}
/** Send the messages to the Kafka broker */
def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = {
val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray
sendMessages(topic, messages)
}
/** Send the array of messages to the Kafka broker */
def sendMessages(topic: String, messages: Array[String]): Unit = {
producer = new Producer[String, String](new ProducerConfig(producerConfiguration))
producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*)
producer.close()
producer = null
}
private def brokerConfiguration: Properties = {
val props = new Properties()
props.put("broker.id", "0")
props.put("host.name", "localhost")
props.put("port", brokerPort.toString)
props.put("log.dir", Utils.createTempDir().getAbsolutePath)
props.put("zookeeper.connect", zkAddress)
props.put("log.flush.interval.messages", "1")
props.put("replica.socket.timeout.ms", "1500")
props
}
private def producerConfiguration: Properties = {
val props = new Properties()
props.put("metadata.broker.list", brokerAddress)
props.put("serializer.class", classOf[StringEncoder].getName)
// wait for all in-sync replicas to ack sends
props.put("request.required.acks", "-1")
props
}
// A simplified version of scalatest eventually, rewritten here to avoid adding extra test
// dependency
def eventually[T](timeout: Time, interval: Time)(func: => T): T = {
def makeAttempt(): Either[Throwable, T] = {
try {
Right(func)
} catch {
case e if NonFatal(e) => Left(e)
}
}
val startTime = System.currentTimeMillis()
@tailrec
def tryAgain(attempt: Int): T = {
makeAttempt() match {
case Right(result) => result
case Left(e) =>
val duration = System.currentTimeMillis() - startTime
if (duration < timeout.milliseconds) {
Thread.sleep(interval.milliseconds)
} else {
throw new TimeoutException(e.getMessage)
}
tryAgain(attempt + 1)
}
}
tryAgain(1)
}
private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
case Some(partitionState) =>
val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
zkUtils.getLeaderForPartition(topic, partition).isDefined &&
Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
leaderAndInSyncReplicas.isr.size >= 1
case _ =>
false
}
eventually(Time(10000), Time(100)) {
assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
}
}
private class EmbeddedZookeeper(val zkConnect: String) {
val snapshotDir = Utils.createTempDir()
val logDir = Utils.createTempDir()
val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
val (ip, port) = {
val splits = zkConnect.split(":")
(splits(0), splits(1).toInt)
}
val factory = new NIOServerCnxnFactory()
factory.configure(new InetSocketAddress(ip, port), 16)
factory.startup(zookeeper)
val actualPort = factory.getLocalPort
def shutdown() {
factory.shutdown()
Utils.deleteRecursively(snapshotDir)
Utils.deleteRecursively(logDir)
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.kafka010
import java.{ util => ju }
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkContext
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{ JavaRDD, JavaSparkContext }
import org.apache.spark.api.java.function.{ Function0 => JFunction0 }
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{ JavaInputDStream, JavaStreamingContext }
import org.apache.spark.streaming.dstream._
/**
* :: Experimental ::
* Companion object for constructing Kafka streams and RDDs
*/
@Experimental
object KafkaUtils extends Logging {
/**
* :: Experimental ::
* Scala constructor for a batch-oriented interface for consuming from Kafka.
* Starting and ending offsets are specified in advance,
* so that you can control exactly-once semantics.
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* configuration parameters</a>. Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
* @param locationStrategy In most cases, pass in [[PreferConsistent]],
* see [[LocationStrategy]] for more details.
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@Experimental
def createRDD[K, V](
sc: SparkContext,
kafkaParams: ju.Map[String, Object],
offsetRanges: Array[OffsetRange],
locationStrategy: LocationStrategy
): RDD[ConsumerRecord[K, V]] = {
val preferredHosts = locationStrategy match {
case PreferBrokers =>
throw new AssertionError(
"If you want to prefer brokers, you must provide a mapping using PreferFixed " +
"A single KafkaRDD does not have a driver consumer and cannot look up brokers for you.")
case PreferConsistent => ju.Collections.emptyMap[TopicPartition, String]()
case PreferFixed(hostMap) => hostMap
}
val kp = new ju.HashMap[String, Object](kafkaParams)
fixKafkaParams(kp)
val osr = offsetRanges.clone()
new KafkaRDD[K, V](sc, kp, osr, preferredHosts, true)
}
/**
* :: Experimental ::
* Java constructor for a batch-oriented interface for consuming from Kafka.
* Starting and ending offsets are specified in advance,
* so that you can control exactly-once semantics.
* @param keyClass Class of the keys in the Kafka records
* @param valueClass Class of the values in the Kafka records
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
* configuration parameters</a>. Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
* @param locationStrategy In most cases, pass in [[PreferConsistent]],
* see [[LocationStrategy]] for more details.
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@Experimental
def createRDD[K, V](
jsc: JavaSparkContext,
kafkaParams: ju.Map[String, Object],
offsetRanges: Array[OffsetRange],
locationStrategy: LocationStrategy
): JavaRDD[ConsumerRecord[K, V]] = {
new JavaRDD(createRDD[K, V](jsc.sc, kafkaParams, offsetRanges, locationStrategy))
}
/**
* :: Experimental ::
* Scala constructor for a DStream where
* each given Kafka topic/partition corresponds to an RDD partition.
* The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
* of messages
* per second that each '''partition''' will accept.
* @param locationStrategy In most cases, pass in [[PreferConsistent]],
* see [[LocationStrategy]] for more details.
* @param consumerStrategy In most cases, pass in [[Subscribe]],
* see [[ConsumerStrategy]] for more details
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@Experimental
def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V]
): InputDStream[ConsumerRecord[K, V]] = {
new DirectKafkaInputDStream[K, V](ssc, locationStrategy, consumerStrategy)
}
/**
* :: Experimental ::
* Java constructor for a DStream where
* each given Kafka topic/partition corresponds to an RDD partition.
* @param keyClass Class of the keys in the Kafka records
* @param valueClass Class of the values in the Kafka records
* @param locationStrategy In most cases, pass in [[PreferConsistent]],
* see [[LocationStrategy]] for more details.
* @param consumerStrategy In most cases, pass in [[Subscribe]],
* see [[ConsumerStrategy]] for more details
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*/
@Experimental
def createDirectStream[K, V](
jssc: JavaStreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V]
): JavaInputDStream[ConsumerRecord[K, V]] = {
new JavaInputDStream(
createDirectStream[K, V](
jssc.ssc, locationStrategy, consumerStrategy))
}
/**
* Tweak kafka params to prevent issues on executors
*/
private[kafka010] def fixKafkaParams(kafkaParams: ju.HashMap[String, Object]): Unit = {
logWarning(s"overriding ${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG} to false for executor")
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false: java.lang.Boolean)
logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to none for executor")
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
// driver and executor should be in different consumer groups
val groupId = "spark-executor-" + kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}")
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
// possible workaround for KAFKA-3135
val rbb = kafkaParams.get(ConsumerConfig.RECEIVE_BUFFER_CONFIG)
if (null == rbb || rbb.asInstanceOf[java.lang.Integer] < 65536) {
logWarning(s"overriding ${ConsumerConfig.RECEIVE_BUFFER_CONFIG} to 65536 see KAFKA-3135")
kafkaParams.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.kafka010
import java.{ util => ju }
import scala.collection.JavaConverters._
import org.apache.kafka.common.TopicPartition
import org.apache.spark.annotation.Experimental
/**
* :: Experimental ::
* Choice of how to schedule consumers for a given TopicPartition on an executor.
* Kafka 0.10 consumers prefetch messages, so it's important for performance
* to keep cached consumers on appropriate executors, not recreate them for every partition.
* Choice of location is only a preference, not an absolute; partitions may be scheduled elsewhere.
*/
@Experimental
sealed trait LocationStrategy
/**
* :: Experimental ::
* Use this only if your executors are on the same nodes as your Kafka brokers.
*/
@Experimental
case object PreferBrokers extends LocationStrategy {
def create: PreferBrokers.type = this
}
/**
* :: Experimental ::
* Use this in most cases, it will consistently distribute partitions across all executors.
*/
@Experimental
case object PreferConsistent extends LocationStrategy {
def create: PreferConsistent.type = this
}
/**
* :: Experimental ::
* Use this to place particular TopicPartitions on particular hosts if your load is uneven.
* Any TopicPartition not specified in the map will use a consistent location.
*/
@Experimental
case class PreferFixed private(hostMap: ju.Map[TopicPartition, String]) extends LocationStrategy
/**
* :: Experimental ::
* Use this to place particular TopicPartitions on particular hosts if your load is uneven.
* Any TopicPartition not specified in the map will use a consistent location.
*/
@Experimental
object PreferFixed {
def apply(hostMap: collection.Map[TopicPartition, String]): PreferFixed = {
PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava))
}
def create(hostMap: ju.Map[TopicPartition, String]): PreferFixed =
PreferFixed(hostMap)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.kafka010
import org.apache.kafka.clients.consumer.OffsetCommitCallback
import org.apache.kafka.common.TopicPartition
import org.apache.spark.annotation.Experimental
/**
* Represents any object that has a collection of [[OffsetRange]]s. This can be used to access the
* offset ranges in RDDs generated by the direct Kafka DStream (see
* [[KafkaUtils.createDirectStream]]).
* {{{
* KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
* val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
* ...
* }
* }}}
*/
trait HasOffsetRanges {
def offsetRanges: Array[OffsetRange]
}
/**
* :: Experimental ::
* Represents any object that can commit a collection of [[OffsetRange]]s.
* The direct Kafka DStream implements this interface (see
* [[KafkaUtils.createDirectStream]]).
* {{{
* val stream = KafkaUtils.createDirectStream(...)
* ...
* stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets, new OffsetCommitCallback() {
* def onComplete(m: java.util.Map[TopicPartition, OffsetAndMetadata], e: Exception) {
* if (null != e) {
* // error
* } else {
* // success
* }
* }
* })
* }}}
*/
@Experimental
trait CanCommitOffsets {
/**
* :: Experimental ::
* Queue up offset ranges for commit to Kafka at a future time. Threadsafe.
* This is only needed if you intend to store offsets in Kafka, instead of your own store.
* @param offsetRanges The maximum untilOffset for a given partition will be used at commit.
*/
@Experimental
def commitAsync(offsetRanges: Array[OffsetRange]): Unit
/**
* :: Experimental ::
* Queue up offset ranges for commit to Kafka at a future time. Threadsafe.
* This is only needed if you intend to store offsets in Kafka, instead of your own store.
* @param offsetRanges The maximum untilOffset for a given partition will be used at commit.
* @param callback Only the most recently provided callback will be used at commit.
*/
@Experimental
def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit
}
/**
* Represents a range of offsets from a single Kafka TopicPartition. Instances of this class
* can be created with `OffsetRange.create()`.
* @param topic Kafka topic name
* @param partition Kafka partition id
* @param fromOffset Inclusive starting offset
* @param untilOffset Exclusive ending offset
*/
final class OffsetRange private(
val topic: String,
val partition: Int,
val fromOffset: Long,
val untilOffset: Long) extends Serializable {
import OffsetRange.OffsetRangeTuple
/** Kafka TopicPartition object, for convenience */
def topicPartition(): TopicPartition = new TopicPartition(topic, partition)
/** Number of messages this OffsetRange refers to */
def count(): Long = untilOffset - fromOffset
override def equals(obj: Any): Boolean = obj match {
case that: OffsetRange =>
this.topic == that.topic &&
this.partition == that.partition &&
this.fromOffset == that.fromOffset &&
this.untilOffset == that.untilOffset
case _ => false
}
override def hashCode(): Int = {
toTuple.hashCode()
}
override def toString(): String = {
s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset -> $untilOffset])"
}
/** this is to avoid ClassNotFoundException during checkpoint restore */
private[streaming]
def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset)
}
/**
* Companion object the provides methods to create instances of [[OffsetRange]].
*/
object OffsetRange {
def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
new OffsetRange(topic, partition, fromOffset, untilOffset)
def create(
topicPartition: TopicPartition,
fromOffset: Long,
untilOffset: Long): OffsetRange =
new OffsetRange(topicPartition.topic, topicPartition.partition, fromOffset, untilOffset)
def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
new OffsetRange(topic, partition, fromOffset, untilOffset)
def apply(
topicPartition: TopicPartition,
fromOffset: Long,
untilOffset: Long): OffsetRange =
new OffsetRange(topicPartition.topic, topicPartition.partition, fromOffset, untilOffset)
/** this is to avoid ClassNotFoundException during checkpoint restore */
private[kafka010]
type OffsetRangeTuple = (String, Int, Long, Long)
private[kafka010]
def apply(t: OffsetRangeTuple) =
new OffsetRange(t._1, t._2, t._3, t._4)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Spark Integration for Kafka 0.10
*/
package org.apache.spark.streaming.kafka010;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming
/**
* Spark Integration for Kafka 0.10
*/
package object kafka
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.kafka010;
import java.io.Serializable;
import java.util.*;
import scala.collection.JavaConverters;
import org.apache.kafka.common.TopicPartition;
import org.junit.Assert;
import org.junit.Test;
public class JavaConsumerStrategySuite implements Serializable {
@Test
public void testConsumerStrategyConstructors() {
final String topic1 = "topic1";
final Collection<String> topics = Arrays.asList(topic1);
final scala.collection.Iterable<String> sTopics =
JavaConverters.collectionAsScalaIterableConverter(topics).asScala();
final TopicPartition tp1 = new TopicPartition(topic1, 0);
final TopicPartition tp2 = new TopicPartition(topic1, 1);
final Collection<TopicPartition> parts = Arrays.asList(tp1, tp2);
final scala.collection.Iterable<TopicPartition> sParts =
JavaConverters.collectionAsScalaIterableConverter(parts).asScala();
final Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", "not used");
final scala.collection.Map<String, Object> sKafkaParams =
JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala();
final Map<TopicPartition, Object> offsets = new HashMap<>();
offsets.put(tp1, 23L);
final scala.collection.Map<TopicPartition, Object> sOffsets =
JavaConverters.mapAsScalaMapConverter(offsets).asScala();
// make sure constructors can be called from java
final ConsumerStrategy<String, String> sub0 =
Subscribe.<String, String>apply(topics, kafkaParams, offsets);
final ConsumerStrategy<String, String> sub1 =
Subscribe.<String, String>apply(sTopics, sKafkaParams, sOffsets);
final ConsumerStrategy<String, String> sub2 =
Subscribe.<String, String>apply(sTopics, sKafkaParams);
final ConsumerStrategy<String, String> sub3 =
Subscribe.<String, String>create(topics, kafkaParams, offsets);
final ConsumerStrategy<String, String> sub4 =
Subscribe.<String, String>create(topics, kafkaParams);
Assert.assertEquals(
sub1.executorKafkaParams().get("bootstrap.servers"),
sub3.executorKafkaParams().get("bootstrap.servers"));
final ConsumerStrategy<String, String> asn0 =
Assign.<String, String>apply(parts, kafkaParams, offsets);
final ConsumerStrategy<String, String> asn1 =
Assign.<String, String>apply(sParts, sKafkaParams, sOffsets);
final ConsumerStrategy<String, String> asn2 =
Assign.<String, String>apply(sParts, sKafkaParams);
final ConsumerStrategy<String, String> asn3 =
Assign.<String, String>create(parts, kafkaParams, offsets);
final ConsumerStrategy<String, String> asn4 =
Assign.<String, String>create(parts, kafkaParams);
Assert.assertEquals(
asn1.executorKafkaParams().get("bootstrap.servers"),
asn3.executorKafkaParams().get("bootstrap.servers"));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.kafka010;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
public class JavaDirectKafkaStreamSuite implements Serializable {
private transient JavaStreamingContext ssc = null;
private transient KafkaTestUtils kafkaTestUtils = null;
@Before
public void setUp() {
kafkaTestUtils = new KafkaTestUtils();
kafkaTestUtils.setup();
SparkConf sparkConf = new SparkConf()
.setMaster("local[4]").setAppName(this.getClass().getSimpleName());
ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
}
@After
public void tearDown() {
if (ssc != null) {
ssc.stop();
ssc = null;
}
if (kafkaTestUtils != null) {
kafkaTestUtils.teardown();
kafkaTestUtils = null;
}
}
@Test
public void testKafkaStream() throws InterruptedException {
final String topic1 = "topic1";
final String topic2 = "topic2";
// hold a reference to the current offset ranges, so it can be used downstream
final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
String[] topic1data = createTopicAndSendData(topic1);
String[] topic2data = createTopicAndSendData(topic2);
Set<String> sent = new HashSet<>();
sent.addAll(Arrays.asList(topic1data));
sent.addAll(Arrays.asList(topic2data));
Random random = new Random();
final Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", kafkaTestUtils.brokerAddress());
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("group.id", "java-test-consumer-" + random.nextInt() +
"-" + System.currentTimeMillis());
JavaInputDStream<ConsumerRecord<String, String>> istream1 = KafkaUtils.createDirectStream(
ssc,
PreferConsistent.create(),
Subscribe.<String, String>create(Arrays.asList(topic1), kafkaParams)
);
JavaDStream<String> stream1 = istream1.transform(
// Make sure you can get offset ranges from the rdd
new Function<JavaRDD<ConsumerRecord<String, String>>,
JavaRDD<ConsumerRecord<String, String>>>() {
@Override
public JavaRDD<ConsumerRecord<String, String>> call(
JavaRDD<ConsumerRecord<String, String>> rdd
) {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
offsetRanges.set(offsets);
Assert.assertEquals(topic1, offsets[0].topic());
return rdd;
}
}
).map(
new Function<ConsumerRecord<String, String>, String>() {
@Override
public String call(ConsumerRecord<String, String> r) {
return r.value();
}
}
);
final Map<String, Object> kafkaParams2 = new HashMap<>(kafkaParams);
kafkaParams2.put("group.id", "java-test-consumer-" + random.nextInt() +
"-" + System.currentTimeMillis());
JavaInputDStream<ConsumerRecord<String, String>> istream2 = KafkaUtils.createDirectStream(
ssc,
PreferConsistent.create(),
Subscribe.<String, String>create(Arrays.asList(topic2), kafkaParams2)
);
JavaDStream<String> stream2 = istream2.transform(
// Make sure you can get offset ranges from the rdd
new Function<JavaRDD<ConsumerRecord<String, String>>,
JavaRDD<ConsumerRecord<String, String>>>() {
@Override
public JavaRDD<ConsumerRecord<String, String>> call(
JavaRDD<ConsumerRecord<String, String>> rdd
) {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
offsetRanges.set(offsets);
Assert.assertEquals(topic2, offsets[0].topic());
return rdd;
}
}
).map(
new Function<ConsumerRecord<String, String>, String>() {
@Override
public String call(ConsumerRecord<String, String> r) {
return r.value();
}
}
);
JavaDStream<String> unifiedStream = stream1.union(stream2);
final Set<String> result = Collections.synchronizedSet(new HashSet<String>());
unifiedStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) {
result.addAll(rdd.collect());
}
}
);
ssc.start();
long startTime = System.currentTimeMillis();
boolean matches = false;
while (!matches && System.currentTimeMillis() - startTime < 20000) {
matches = sent.size() == result.size();
Thread.sleep(50);
}
Assert.assertEquals(sent, result);
ssc.stop();
}
private String[] createTopicAndSendData(String topic) {
String[] data = { topic + "-1", topic + "-2", topic + "-3"};
kafkaTestUtils.createTopic(topic);
kafkaTestUtils.sendMessages(topic, data);
return data;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.kafka010;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
public class JavaKafkaRDDSuite implements Serializable {
private transient JavaSparkContext sc = null;
private transient KafkaTestUtils kafkaTestUtils = null;
@Before
public void setUp() {
kafkaTestUtils = new KafkaTestUtils();
kafkaTestUtils.setup();
SparkConf sparkConf = new SparkConf()
.setMaster("local[4]").setAppName(this.getClass().getSimpleName());
sc = new JavaSparkContext(sparkConf);
}
@After
public void tearDown() {
if (sc != null) {
sc.stop();
sc = null;
}
if (kafkaTestUtils != null) {
kafkaTestUtils.teardown();
kafkaTestUtils = null;
}
}
@Test
public void testKafkaRDD() throws InterruptedException {
String topic1 = "topic1";
String topic2 = "topic2";
createTopicAndSendData(topic1);
createTopicAndSendData(topic2);
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", kafkaTestUtils.brokerAddress());
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
OffsetRange[] offsetRanges = {
OffsetRange.create(topic1, 0, 0, 1),
OffsetRange.create(topic2, 0, 0, 1)
};
Map<TopicPartition, String> leaders = new HashMap<>();
String[] hostAndPort = kafkaTestUtils.brokerAddress().split(":");
String broker = hostAndPort[0];
leaders.put(offsetRanges[0].topicPartition(), broker);
leaders.put(offsetRanges[1].topicPartition(), broker);
Function<ConsumerRecord<String, String>, String> handler =
new Function<ConsumerRecord<String, String>, String>() {
@Override
public String call(ConsumerRecord<String, String> r) {
return r.value();
}
};
JavaRDD<String> rdd1 = KafkaUtils.<String, String>createRDD(
sc,
kafkaParams,
offsetRanges,
PreferFixed.create(leaders)
).map(handler);
JavaRDD<String> rdd2 = KafkaUtils.<String, String>createRDD(
sc,
kafkaParams,
offsetRanges,
PreferConsistent.create()
).map(handler);
// just making sure the java user apis work; the scala tests handle logic corner cases
long count1 = rdd1.count();
long count2 = rdd2.count();
Assert.assertTrue(count1 > 0);
Assert.assertEquals(count1, count2);
}
private String[] createTopicAndSendData(String topic) {
String[] data = { topic + "-1", topic + "-2", topic + "-3"};
kafkaTestUtils.createTopic(topic);
kafkaTestUtils.sendMessages(topic, data);
return data;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.kafka010;
import java.io.Serializable;
import java.util.*;
import scala.collection.JavaConverters;
import org.apache.kafka.common.TopicPartition;
import org.junit.Assert;
import org.junit.Test;
public class JavaLocationStrategySuite implements Serializable {
@Test
public void testLocationStrategyConstructors() {
final String topic1 = "topic1";
final TopicPartition tp1 = new TopicPartition(topic1, 0);
final TopicPartition tp2 = new TopicPartition(topic1, 1);
final Map<TopicPartition, String> hosts = new HashMap<>();
hosts.put(tp1, "node1");
hosts.put(tp2, "node2");
final scala.collection.Map<TopicPartition, String> sHosts =
JavaConverters.mapAsScalaMapConverter(hosts).asScala();
// make sure constructors can be called from java
final LocationStrategy c1 = PreferConsistent.create();
final LocationStrategy c2 = PreferConsistent$.MODULE$;
Assert.assertEquals(c1, c2);
final LocationStrategy c3 = PreferBrokers.create();
final LocationStrategy c4 = PreferBrokers$.MODULE$;
Assert.assertEquals(c3, c4);
final LocationStrategy c5 = PreferFixed.create(hosts);
final LocationStrategy c6 = PreferFixed.apply(sHosts);
Assert.assertEquals(c5, c6);
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Set everything to be logged to the file target/unit-tests.log
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=true
log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.spark-project.jetty=WARN
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.kafka010
import java.io.File
import java.util.{ Arrays, HashMap => JHashMap, Map => JMap }
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
import org.scalatest.concurrent.Eventually
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.scheduler.rate.RateEstimator
import org.apache.spark.util.Utils
class DirectKafkaStreamSuite
extends SparkFunSuite
with BeforeAndAfter
with BeforeAndAfterAll
with Eventually
with Logging {
val sparkConf = new SparkConf()
.setMaster("local[4]")
.setAppName(this.getClass.getSimpleName)
private var sc: SparkContext = _
private var ssc: StreamingContext = _
private var testDir: File = _
private var kafkaTestUtils: KafkaTestUtils = _
override def beforeAll {
kafkaTestUtils = new KafkaTestUtils
kafkaTestUtils.setup()
}
override def afterAll {
if (kafkaTestUtils != null) {
kafkaTestUtils.teardown()
kafkaTestUtils = null
}
}
after {
if (ssc != null) {
ssc.stop()
sc = null
}
if (sc != null) {
sc.stop()
}
if (testDir != null) {
Utils.deleteRecursively(testDir)
}
}
def getKafkaParams(extra: (String, Object)*): JHashMap[String, Object] = {
val kp = new JHashMap[String, Object]()
kp.put("bootstrap.servers", kafkaTestUtils.brokerAddress)
kp.put("key.deserializer", classOf[StringDeserializer])
kp.put("value.deserializer", classOf[StringDeserializer])
kp.put("group.id", s"test-consumer-${Random.nextInt}-${System.currentTimeMillis}")
extra.foreach(e => kp.put(e._1, e._2))
kp
}
val preferredHosts = PreferConsistent
test("basic stream receiving with multiple topics and smallest starting offset") {
val topics = List("basic1", "basic2", "basic3")
val data = Map("a" -> 7, "b" -> 9)
topics.foreach { t =>
kafkaTestUtils.createTopic(t)
kafkaTestUtils.sendMessages(t, data)
}
val totalSent = data.values.sum * topics.size
val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
ssc = new StreamingContext(sparkConf, Milliseconds(200))
val stream = withClue("Error creating direct stream") {
KafkaUtils.createDirectStream[String, String](
ssc, preferredHosts, Subscribe[String, String](topics, kafkaParams.asScala))
}
val allReceived = new ConcurrentLinkedQueue[(String, String)]()
// hold a reference to the current offset ranges, so it can be used downstream
var offsetRanges = Array[OffsetRange]()
val tf = stream.transform { rdd =>
// Get the offset ranges in the RDD
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.map(r => (r.key, r.value))
}
tf.foreachRDD { rdd =>
for (o <- offsetRanges) {
logInfo(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
// For each partition, get size of the range in the partition,
// and the number of items in the partition
val off = offsetRanges(i)
val all = iter.toSeq
val partSize = all.size
val rangeSize = off.untilOffset - off.fromOffset
Iterator((partSize, rangeSize))
}.collect
// Verify whether number of elements in each partition
// matches with the corresponding offset range
collected.foreach { case (partSize, rangeSize) =>
assert(partSize === rangeSize, "offset ranges are wrong")
}
}
stream.foreachRDD { rdd =>
allReceived.addAll(Arrays.asList(rdd.map(r => (r.key, r.value)).collect(): _*))
}
ssc.start()
eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
assert(allReceived.size === totalSent,
"didn't get expected number of messages, messages:\n" +
allReceived.asScala.mkString("\n"))
}
ssc.stop()
}
test("receiving from largest starting offset") {
val topic = "latest"
val topicPartition = new TopicPartition(topic, 0)
val data = Map("a" -> 10)
kafkaTestUtils.createTopic(topic)
val kafkaParams = getKafkaParams("auto.offset.reset" -> "latest")
val kc = new KafkaConsumer(kafkaParams)
kc.assign(Arrays.asList(topicPartition))
def getLatestOffset(): Long = {
kc.seekToEnd(Arrays.asList(topicPartition))
kc.position(topicPartition)
}
// Send some initial messages before starting context
kafkaTestUtils.sendMessages(topic, data)
eventually(timeout(10 seconds), interval(20 milliseconds)) {
assert(getLatestOffset() > 3)
}
val offsetBeforeStart = getLatestOffset()
kc.close()
// Setup context and kafka stream with largest offset
ssc = new StreamingContext(sparkConf, Milliseconds(200))
val stream = withClue("Error creating direct stream") {
val s = new DirectKafkaInputDStream[String, String](
ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala))
s.consumer.poll(0)
assert(
s.consumer.position(topicPartition) >= offsetBeforeStart,
"Start offset not from latest"
)
s
}
val collectedData = new ConcurrentLinkedQueue[String]()
stream.map { _.value }.foreachRDD { rdd =>
collectedData.addAll(Arrays.asList(rdd.collect(): _*))
}
ssc.start()
val newData = Map("b" -> 10)
kafkaTestUtils.sendMessages(topic, newData)
eventually(timeout(10 seconds), interval(50 milliseconds)) {
collectedData.contains("b")
}
assert(!collectedData.contains("a"))
}
test("creating stream by offset") {
val topic = "offset"
val topicPartition = new TopicPartition(topic, 0)
val data = Map("a" -> 10)
kafkaTestUtils.createTopic(topic)
val kafkaParams = getKafkaParams("auto.offset.reset" -> "latest")
val kc = new KafkaConsumer(kafkaParams)
kc.assign(Arrays.asList(topicPartition))
def getLatestOffset(): Long = {
kc.seekToEnd(Arrays.asList(topicPartition))
kc.position(topicPartition)
}
// Send some initial messages before starting context
kafkaTestUtils.sendMessages(topic, data)
eventually(timeout(10 seconds), interval(20 milliseconds)) {
assert(getLatestOffset() >= 10)
}
val offsetBeforeStart = getLatestOffset()
kc.close()
// Setup context and kafka stream with largest offset
ssc = new StreamingContext(sparkConf, Milliseconds(200))
val stream = withClue("Error creating direct stream") {
val s = new DirectKafkaInputDStream[String, String](ssc, preferredHosts,
Assign[String, String](
List(topicPartition),
kafkaParams.asScala,
Map(topicPartition -> 11L)))
s.consumer.poll(0)
assert(
s.consumer.position(topicPartition) >= offsetBeforeStart,
"Start offset not from latest"
)
s
}
val collectedData = new ConcurrentLinkedQueue[String]()
stream.map(_.value).foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
ssc.start()
val newData = Map("b" -> 10)
kafkaTestUtils.sendMessages(topic, newData)
eventually(timeout(10 seconds), interval(50 milliseconds)) {
collectedData.contains("b")
}
assert(!collectedData.contains("a"))
}
// Test to verify the offset ranges can be recovered from the checkpoints
test("offset recovery") {
val topic = "recovery"
kafkaTestUtils.createTopic(topic)
testDir = Utils.createTempDir()
val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
// Send data to Kafka
def sendData(data: Seq[Int]) {
val strings = data.map { _.toString}
kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap)
}
// Setup the streaming context
ssc = new StreamingContext(sparkConf, Milliseconds(100))
val kafkaStream = withClue("Error creating direct stream") {
KafkaUtils.createDirectStream[String, String](
ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala))
}
val keyedStream = kafkaStream.map { r => "key" -> r.value.toInt }
val stateStream = keyedStream.updateStateByKey { (values: Seq[Int], state: Option[Int]) =>
Some(values.sum + state.getOrElse(0))
}
ssc.checkpoint(testDir.getAbsolutePath)
// This is ensure all the data is eventually receiving only once
stateStream.foreachRDD { (rdd: RDD[(String, Int)]) =>
rdd.collect().headOption.foreach { x =>
DirectKafkaStreamSuite.total.set(x._2)
}
}
ssc.start()
// Send some data
for (i <- (1 to 10).grouped(4)) {
sendData(i)
}
eventually(timeout(10 seconds), interval(50 milliseconds)) {
assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum)
}
ssc.stop()
// Verify that offset ranges were generated
val offsetRangesBeforeStop = getOffsetRanges(kafkaStream)
assert(offsetRangesBeforeStop.size >= 1, "No offset ranges generated")
assert(
offsetRangesBeforeStop.head._2.forall { _.fromOffset === 0 },
"starting offset not zero"
)
logInfo("====== RESTARTING ========")
// Recover context from checkpoints
ssc = new StreamingContext(testDir.getAbsolutePath)
val recoveredStream =
ssc.graph.getInputStreams().head.asInstanceOf[DStream[ConsumerRecord[String, String]]]
// Verify offset ranges have been recovered
val recoveredOffsetRanges = getOffsetRanges(recoveredStream).map { x => (x._1, x._2.toSet) }
assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered")
val earlierOffsetRanges = offsetRangesBeforeStop.map { x => (x._1, x._2.toSet) }
assert(
recoveredOffsetRanges.forall { or =>
earlierOffsetRanges.contains((or._1, or._2))
},
"Recovered ranges are not the same as the ones generated\n" +
earlierOffsetRanges + "\n" + recoveredOffsetRanges
)
// Restart context, give more data and verify the total at the end
// If the total is write that means each records has been received only once
ssc.start()
for (i <- (11 to 20).grouped(4)) {
sendData(i)
}
eventually(timeout(10 seconds), interval(50 milliseconds)) {
assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum)
}
ssc.stop()
}
// Test to verify the offsets can be recovered from Kafka
test("offset recovery from kafka") {
val topic = "recoveryfromkafka"
kafkaTestUtils.createTopic(topic)
val kafkaParams = getKafkaParams(
"auto.offset.reset" -> "earliest",
("enable.auto.commit", false: java.lang.Boolean)
)
val collectedData = new ConcurrentLinkedQueue[String]()
val committed = new JHashMap[TopicPartition, OffsetAndMetadata]()
// Send data to Kafka and wait for it to be received
def sendDataAndWaitForReceive(data: Seq[Int]) {
val strings = data.map { _.toString}
kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap)
eventually(timeout(10 seconds), interval(50 milliseconds)) {
assert(strings.forall { collectedData.contains })
}
}
// Setup the streaming context
ssc = new StreamingContext(sparkConf, Milliseconds(100))
withClue("Error creating direct stream") {
val kafkaStream = KafkaUtils.createDirectStream[String, String](
ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala))
kafkaStream.foreachRDD { (rdd: RDD[ConsumerRecord[String, String]], time: Time) =>
val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val data = rdd.map(_.value).collect()
collectedData.addAll(Arrays.asList(data: _*))
kafkaStream.asInstanceOf[CanCommitOffsets]
.commitAsync(offsets, new OffsetCommitCallback() {
def onComplete(m: JMap[TopicPartition, OffsetAndMetadata], e: Exception) {
if (null != e) {
logError("commit failed", e)
} else {
committed.putAll(m)
}
}
})
}
}
ssc.start()
// Send some data and wait for them to be received
for (i <- (1 to 10).grouped(4)) {
sendDataAndWaitForReceive(i)
}
ssc.stop()
assert(! committed.isEmpty)
val consumer = new KafkaConsumer[String, String](kafkaParams)
consumer.subscribe(Arrays.asList(topic))
consumer.poll(0)
committed.asScala.foreach {
case (k, v) =>
// commits are async, not exactly once
assert(v.offset > 0)
assert(consumer.position(k) >= v.offset)
}
}
test("Direct Kafka stream report input information") {
val topic = "report-test"
val data = Map("a" -> 7, "b" -> 9)
kafkaTestUtils.createTopic(topic)
kafkaTestUtils.sendMessages(topic, data)
val totalSent = data.values.sum
val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
import DirectKafkaStreamSuite._
ssc = new StreamingContext(sparkConf, Milliseconds(200))
val collector = new InputInfoCollector
ssc.addStreamingListener(collector)
val stream = withClue("Error creating direct stream") {
KafkaUtils.createDirectStream[String, String](
ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala))
}
val allReceived = new ConcurrentLinkedQueue[(String, String)]
stream.map(r => (r.key, r.value))
.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) }
ssc.start()
eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
assert(allReceived.size === totalSent,
"didn't get expected number of messages, messages:\n" +
allReceived.asScala.mkString("\n"))
// Calculate all the record number collected in the StreamingListener.
assert(collector.numRecordsSubmitted.get() === totalSent)
assert(collector.numRecordsStarted.get() === totalSent)
assert(collector.numRecordsCompleted.get() === totalSent)
}
ssc.stop()
}
test("maxMessagesPerPartition with backpressure disabled") {
val topic = "maxMessagesPerPartition"
val kafkaStream = getDirectKafkaStream(topic, None)
val input = Map(new TopicPartition(topic, 0) -> 50L, new TopicPartition(topic, 1) -> 50L)
assert(kafkaStream.maxMessagesPerPartition(input).get ==
Map(new TopicPartition(topic, 0) -> 10L, new TopicPartition(topic, 1) -> 10L))
}
test("maxMessagesPerPartition with no lag") {
val topic = "maxMessagesPerPartition"
val rateController = Some(new ConstantRateController(0, new ConstantEstimator(100), 100))
val kafkaStream = getDirectKafkaStream(topic, rateController)
val input = Map(new TopicPartition(topic, 0) -> 0L, new TopicPartition(topic, 1) -> 0L)
assert(kafkaStream.maxMessagesPerPartition(input).isEmpty)
}
test("maxMessagesPerPartition respects max rate") {
val topic = "maxMessagesPerPartition"
val rateController = Some(new ConstantRateController(0, new ConstantEstimator(100), 1000))
val kafkaStream = getDirectKafkaStream(topic, rateController)
val input = Map(new TopicPartition(topic, 0) -> 1000L, new TopicPartition(topic, 1) -> 1000L)
assert(kafkaStream.maxMessagesPerPartition(input).get ==
Map(new TopicPartition(topic, 0) -> 10L, new TopicPartition(topic, 1) -> 10L))
}
test("using rate controller") {
val topic = "backpressure"
val topicPartitions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
kafkaTestUtils.createTopic(topic, 2)
val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
val executorKafkaParams = new JHashMap[String, Object](kafkaParams)
KafkaUtils.fixKafkaParams(executorKafkaParams)
val batchIntervalMilliseconds = 100
val estimator = new ConstantEstimator(100)
val messages = Map("foo" -> 200)
kafkaTestUtils.sendMessages(topic, messages)
val sparkConf = new SparkConf()
// Safe, even with streaming, because we're using the direct API.
// Using 1 core is useful to make the test more predictable.
.setMaster("local[1]")
.setAppName(this.getClass.getSimpleName)
.set("spark.streaming.kafka.maxRatePerPartition", "100")
// Setup the streaming context
ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds))
val kafkaStream = withClue("Error creating direct stream") {
new DirectKafkaInputDStream[String, String](
ssc, preferredHosts, Subscribe[String, String](List(topic), kafkaParams.asScala)) {
override protected[streaming] val rateController =
Some(new DirectKafkaRateController(id, estimator))
}.map(r => (r.key, r.value))
}
val collectedData = new ConcurrentLinkedQueue[Array[String]]()
// Used for assertion failure messages.
def dataToString: String =
collectedData.asScala.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}")
// This is to collect the raw data received from Kafka
kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
val data = rdd.map { _._2 }.collect()
collectedData.add(data)
}
ssc.start()
// Try different rate limits.
// Wait for arrays of data to appear matching the rate.
Seq(100, 50, 20).foreach { rate =>
collectedData.clear() // Empty this buffer on each pass.
estimator.updateRate(rate) // Set a new rate.
// Expect blocks of data equal to "rate", scaled by the interval length in secs.
val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001)
eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) {
// Assert that rate estimator values are used to determine maxMessagesPerPartition.
// Funky "-" in message makes the complete assertion message read better.
assert(collectedData.asScala.exists(_.size == expectedSize),
s" - No arrays of size $expectedSize for rate $rate found in $dataToString")
}
}
ssc.stop()
}
/** Get the generated offset ranges from the DirectKafkaStream */
private def getOffsetRanges[K, V](
kafkaStream: DStream[ConsumerRecord[K, V]]): Seq[(Time, Array[OffsetRange])] = {
kafkaStream.generatedRDDs.mapValues { rdd =>
rdd.asInstanceOf[HasOffsetRanges].offsetRanges
}.toSeq.sortBy { _._1 }
}
private def getDirectKafkaStream(topic: String, mockRateController: Option[RateController]) = {
val batchIntervalMilliseconds = 100
val sparkConf = new SparkConf()
.setMaster("local[1]")
.setAppName(this.getClass.getSimpleName)
.set("spark.streaming.kafka.maxRatePerPartition", "100")
// Setup the streaming context
ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds))
val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
val ekp = new JHashMap[String, Object](kafkaParams)
KafkaUtils.fixKafkaParams(ekp)
val s = new DirectKafkaInputDStream[String, String](
ssc,
preferredHosts,
new ConsumerStrategy[String, String] {
def executorKafkaParams = ekp
def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[String, String] = {
val consumer = new KafkaConsumer[String, String](kafkaParams)
val tps = List(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
consumer.assign(Arrays.asList(tps: _*))
tps.foreach(tp => consumer.seek(tp, 0))
consumer
}
}
) {
override protected[streaming] val rateController = mockRateController
}
// manual start necessary because we arent consuming the stream, just checking its state
s.start()
s
}
}
object DirectKafkaStreamSuite {
val total = new AtomicLong(-1L)
class InputInfoCollector extends StreamingListener {
val numRecordsSubmitted = new AtomicLong(0L)
val numRecordsStarted = new AtomicLong(0L)
val numRecordsCompleted = new AtomicLong(0L)
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
numRecordsSubmitted.addAndGet(batchSubmitted.batchInfo.numRecords)
}
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
numRecordsStarted.addAndGet(batchStarted.batchInfo.numRecords)
}
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
numRecordsCompleted.addAndGet(batchCompleted.batchInfo.numRecords)
}
}
}
private[streaming] class ConstantEstimator(@volatile private var rate: Long)
extends RateEstimator {
def updateRate(newRate: Long): Unit = {
rate = newRate
}
def compute(
time: Long,
elements: Long,
processingDelay: Long,
schedulingDelay: Long): Option[Double] = Some(rate)
}
private[streaming] class ConstantRateController(id: Int, estimator: RateEstimator, rate: Long)
extends RateController(id, estimator) {
override def publish(rate: Long): Unit = ()
override def getLatestRate(): Long = rate
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.kafka010
import java.{ util => ju }
import scala.collection.JavaConverters._
import scala.util.Random
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.scalatest.BeforeAndAfterAll
import org.apache.spark._
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
private var kafkaTestUtils: KafkaTestUtils = _
private val sparkConf = new SparkConf().setMaster("local[4]")
.setAppName(this.getClass.getSimpleName)
private var sc: SparkContext = _
override def beforeAll {
sc = new SparkContext(sparkConf)
kafkaTestUtils = new KafkaTestUtils
kafkaTestUtils.setup()
}
override def afterAll {
if (sc != null) {
sc.stop
sc = null
}
if (kafkaTestUtils != null) {
kafkaTestUtils.teardown()
kafkaTestUtils = null
}
}
private def getKafkaParams() = Map[String, Object](
"bootstrap.servers" -> kafkaTestUtils.brokerAddress,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> s"test-consumer-${Random.nextInt}-${System.currentTimeMillis}"
).asJava
private val preferredHosts = PreferConsistent
test("basic usage") {
val topic = s"topicbasic-${Random.nextInt}-${System.currentTimeMillis}"
kafkaTestUtils.createTopic(topic)
val messages = Array("the", "quick", "brown", "fox")
kafkaTestUtils.sendMessages(topic, messages)
val kafkaParams = getKafkaParams()
val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
val rdd = KafkaUtils.createRDD[String, String](sc, kafkaParams, offsetRanges, preferredHosts)
.map(_.value)
val received = rdd.collect.toSet
assert(received === messages.toSet)
// size-related method optimizations return sane results
assert(rdd.count === messages.size)
assert(rdd.countApprox(0).getFinalValue.mean === messages.size)
assert(!rdd.isEmpty)
assert(rdd.take(1).size === 1)
assert(rdd.take(1).head === messages.head)
assert(rdd.take(messages.size + 10).size === messages.size)
val emptyRdd = KafkaUtils.createRDD[String, String](
sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0)), preferredHosts)
assert(emptyRdd.isEmpty)
// invalid offset ranges throw exceptions
val badRanges = Array(OffsetRange(topic, 0, 0, messages.size + 1))
intercept[SparkException] {
val result = KafkaUtils.createRDD[String, String](sc, kafkaParams, badRanges, preferredHosts)
.map(_.value)
.collect()
}
}
test("iterator boundary conditions") {
// the idea is to find e.g. off-by-one errors between what kafka has available and the rdd
val topic = s"topicboundary-${Random.nextInt}-${System.currentTimeMillis}"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
kafkaTestUtils.createTopic(topic)
val kafkaParams = getKafkaParams()
// this is the "lots of messages" case
kafkaTestUtils.sendMessages(topic, sent)
var sentCount = sent.values.sum
val rdd = KafkaUtils.createRDD[String, String](sc, kafkaParams,
Array(OffsetRange(topic, 0, 0, sentCount)), preferredHosts)
val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val rangeCount = ranges.map(o => o.untilOffset - o.fromOffset).sum
assert(rangeCount === sentCount, "offset range didn't include all sent messages")
assert(rdd.map(_.offset).collect.sorted === (0 until sentCount).toArray,
"didn't get all sent messages")
// this is the "0 messages" case
val rdd2 = KafkaUtils.createRDD[String, String](sc, kafkaParams,
Array(OffsetRange(topic, 0, sentCount, sentCount)), preferredHosts)
// shouldn't get anything, since message is sent after rdd was defined
val sentOnlyOne = Map("d" -> 1)
kafkaTestUtils.sendMessages(topic, sentOnlyOne)
assert(rdd2.map(_.value).collect.size === 0, "got messages when there shouldn't be any")
// this is the "exactly 1 message" case, namely the single message from sentOnlyOne above
val rdd3 = KafkaUtils.createRDD[String, String](sc, kafkaParams,
Array(OffsetRange(topic, 0, sentCount, sentCount + 1)), preferredHosts)
// send lots of messages after rdd was defined, they shouldn't show up
kafkaTestUtils.sendMessages(topic, Map("extra" -> 22))
assert(rdd3.map(_.value).collect.head === sentOnlyOne.keys.head,
"didn't get exactly one message")
}
test("executor sorting") {
val kafkaParams = new ju.HashMap[String, Object](getKafkaParams())
kafkaParams.put("auto.offset.reset", "none")
val rdd = new KafkaRDD[String, String](
sc,
kafkaParams,
Array(OffsetRange("unused", 0, 1, 2)),
ju.Collections.emptyMap[TopicPartition, String](),
true)
val a3 = ExecutorCacheTaskLocation("a", "3")
val a4 = ExecutorCacheTaskLocation("a", "4")
val b1 = ExecutorCacheTaskLocation("b", "1")
val b2 = ExecutorCacheTaskLocation("b", "2")
val correct = Array(b2, b1, a4, a3)
correct.permutations.foreach { p =>
assert(p.sortWith(rdd.compareExecutors) === correct)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment