Skip to content
Snippets Groups Projects
Commit fd6e8f0e authored by cody koeninger's avatar cody koeninger Committed by Tathagata Das
Browse files

[SPARK-13569][STREAMING][KAFKA] pattern based topic subscription

## What changes were proposed in this pull request?
Allow for kafka topic subscriptions based on a regex pattern.

## How was this patch tested?
Unit tests, manual tests

Author: cody koeninger <cody@koeninger.org>

Closes #14026 from koeninger/SPARK-13569.
parent 3b22291b
No related branches found
No related tags found
No related merge requests found
......@@ -22,10 +22,11 @@ import java.{ lang => jl, util => ju }
import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.common.TopicPartition
import org.apache.spark.annotation.Experimental
import org.apache.spark.internal.Logging
/**
* :: Experimental ::
......@@ -47,7 +48,9 @@ abstract class ConsumerStrategy[K, V] {
/**
* Must return a fully configured Kafka Consumer, including subscribed or assigned topics.
* See <a href="http://kafka.apache.org/documentation.html#newconsumerapi">Kafka docs</a>.
* This consumer will be used on the driver to query for offsets only, not messages.
* The consumer must be returned in a state that it is safe to call poll(0) on.
* @param currentOffsets A map from TopicPartition to offset, indicating how far the driver
* has successfully read. Will be empty on initial start, possibly non-empty on restart from
* checkpoint.
......@@ -72,15 +75,83 @@ private case class Subscribe[K, V](
topics: ju.Collection[jl.String],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, jl.Long]
) extends ConsumerStrategy[K, V] {
) extends ConsumerStrategy[K, V] with Logging {
def executorKafkaParams: ju.Map[String, Object] = kafkaParams
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
val consumer = new KafkaConsumer[K, V](kafkaParams)
consumer.subscribe(topics)
if (currentOffsets.isEmpty) {
offsets.asScala.foreach { case (topicPartition, offset) =>
val toSeek = if (currentOffsets.isEmpty) {
offsets
} else {
currentOffsets
}
if (!toSeek.isEmpty) {
// work around KAFKA-3370 when reset is none
// poll will throw if no position, i.e. auto offset reset none and no explicit position
// but cant seek to a position before poll, because poll is what gets subscription partitions
// So, poll, suppress the first exception, then seek
val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase == "NONE"
try {
consumer.poll(0)
} catch {
case x: NoOffsetForPartitionException if shouldSuppress =>
logWarning("Catching NoOffsetForPartitionException since " +
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See KAFKA-3370")
}
toSeek.asScala.foreach { case (topicPartition, offset) =>
consumer.seek(topicPartition, offset)
}
}
consumer
}
}
/**
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
* The pattern matching will be done periodically against topics existing at the time of check.
* @param pattern pattern to subscribe to
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsets: offsets to begin at on initial startup. If no offset is given for a
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
private case class SubscribePattern[K, V](
pattern: ju.regex.Pattern,
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, jl.Long]
) extends ConsumerStrategy[K, V] with Logging {
def executorKafkaParams: ju.Map[String, Object] = kafkaParams
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
val consumer = new KafkaConsumer[K, V](kafkaParams)
consumer.subscribe(pattern, new NoOpConsumerRebalanceListener())
val toSeek = if (currentOffsets.isEmpty) {
offsets
} else {
currentOffsets
}
if (!toSeek.isEmpty) {
// work around KAFKA-3370 when reset is none, see explanation in Subscribe above
val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase == "NONE"
try {
consumer.poll(0)
} catch {
case x: NoOffsetForPartitionException if shouldSuppress =>
logWarning("Catching NoOffsetForPartitionException since " +
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See KAFKA-3370")
}
toSeek.asScala.foreach { case (topicPartition, offset) =>
consumer.seek(topicPartition, offset)
}
}
......@@ -113,8 +184,14 @@ private case class Assign[K, V](
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
val consumer = new KafkaConsumer[K, V](kafkaParams)
consumer.assign(topicPartitions)
if (currentOffsets.isEmpty) {
offsets.asScala.foreach { case (topicPartition, offset) =>
val toSeek = if (currentOffsets.isEmpty) {
offsets
} else {
currentOffsets
}
if (!toSeek.isEmpty) {
// this doesn't need a KAFKA-3370 workaround, because partitions are known, no poll needed
toSeek.asScala.foreach { case (topicPartition, offset) =>
consumer.seek(topicPartition, offset)
}
}
......@@ -215,6 +292,95 @@ object ConsumerStrategies {
new Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, jl.Long]())
}
/** :: Experimental ::
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
* The pattern matching will be done periodically against topics existing at the time of check.
* @param pattern pattern to subscribe to
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsets: offsets to begin at on initial startup. If no offset is given for a
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@Experimental
def SubscribePattern[K, V](
pattern: ju.regex.Pattern,
kafkaParams: collection.Map[String, Object],
offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {
new SubscribePattern[K, V](
pattern,
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
}
/** :: Experimental ::
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
* The pattern matching will be done periodically against topics existing at the time of check.
* @param pattern pattern to subscribe to
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def SubscribePattern[K, V](
pattern: ju.regex.Pattern,
kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {
new SubscribePattern[K, V](
pattern,
new ju.HashMap[String, Object](kafkaParams.asJava),
ju.Collections.emptyMap[TopicPartition, jl.Long]())
}
/** :: Experimental ::
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
* The pattern matching will be done periodically against topics existing at the time of check.
* @param pattern pattern to subscribe to
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsets: offsets to begin at on initial startup. If no offset is given for a
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
*/
@Experimental
def SubscribePattern[K, V](
pattern: ju.regex.Pattern,
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = {
new SubscribePattern[K, V](pattern, kafkaParams, offsets)
}
/** :: Experimental ::
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
* The pattern matching will be done periodically against topics existing at the time of check.
* @param pattern pattern to subscribe to
* @param kafkaParams Kafka
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
* configuration parameters</a> to be used on driver. The same params will be used on executors,
* with minor automatic modifications applied.
* Requires "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
*/
@Experimental
def SubscribePattern[K, V](
pattern: ju.regex.Pattern,
kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
new SubscribePattern[K, V](
pattern,
kafkaParams,
ju.Collections.emptyMap[TopicPartition, jl.Long]())
}
/**
* :: Experimental ::
* Assign a fixed collection of TopicPartitions
......
......@@ -19,6 +19,7 @@ package org.apache.spark.streaming.kafka010;
import java.io.Serializable;
import java.util.*;
import java.util.regex.Pattern;
import scala.collection.JavaConverters;
......@@ -32,6 +33,7 @@ public class JavaConsumerStrategySuite implements Serializable {
@Test
public void testConsumerStrategyConstructors() {
final String topic1 = "topic1";
final Pattern pat = Pattern.compile("top.*");
final Collection<String> topics = Arrays.asList(topic1);
final scala.collection.Iterable<String> sTopics =
JavaConverters.collectionAsScalaIterableConverter(topics).asScala();
......@@ -69,6 +71,19 @@ public class JavaConsumerStrategySuite implements Serializable {
sub1.executorKafkaParams().get("bootstrap.servers"),
sub3.executorKafkaParams().get("bootstrap.servers"));
final ConsumerStrategy<String, String> psub1 =
ConsumerStrategies.<String, String>SubscribePattern(pat, sKafkaParams, sOffsets);
final ConsumerStrategy<String, String> psub2 =
ConsumerStrategies.<String, String>SubscribePattern(pat, sKafkaParams);
final ConsumerStrategy<String, String> psub3 =
ConsumerStrategies.<String, String>SubscribePattern(pat, kafkaParams, offsets);
final ConsumerStrategy<String, String> psub4 =
ConsumerStrategies.<String, String>SubscribePattern(pat, kafkaParams);
Assert.assertEquals(
psub1.executorKafkaParams().get("bootstrap.servers"),
psub3.executorKafkaParams().get("bootstrap.servers"));
final ConsumerStrategy<String, String> asn1 =
ConsumerStrategies.<String, String>Assign(sParts, sKafkaParams, sOffsets);
final ConsumerStrategy<String, String> asn2 =
......
......@@ -103,7 +103,9 @@ class DirectKafkaStreamSuite
kafkaTestUtils.createTopic(t)
kafkaTestUtils.sendMessages(t, data)
}
val totalSent = data.values.sum * topics.size
val offsets = Map(new TopicPartition("basic3", 0) -> 2L)
// one topic is starting 2 messages later
val expectedTotal = (data.values.sum * topics.size) - 2
val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
ssc = new StreamingContext(sparkConf, Milliseconds(200))
......@@ -111,7 +113,7 @@ class DirectKafkaStreamSuite
KafkaUtils.createDirectStream[String, String](
ssc,
preferredHosts,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams.asScala))
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams.asScala, offsets))
}
val allReceived = new ConcurrentLinkedQueue[(String, String)]()
......@@ -149,13 +151,78 @@ class DirectKafkaStreamSuite
}
ssc.start()
eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
assert(allReceived.size === totalSent,
assert(allReceived.size === expectedTotal,
"didn't get expected number of messages, messages:\n" +
allReceived.asScala.mkString("\n"))
}
ssc.stop()
}
test("pattern based subscription") {
val topics = List("pat1", "pat2", "advanced3")
// Should match 2 out of 3 topics
val pat = """pat\d""".r.pattern
val data = Map("a" -> 7, "b" -> 9)
topics.foreach { t =>
kafkaTestUtils.createTopic(t)
kafkaTestUtils.sendMessages(t, data)
}
val offsets = Map(new TopicPartition("pat2", 0) -> 3L)
// 2 matching topics, one of which starts 3 messages later
val expectedTotal = (data.values.sum * 2) - 3
val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
ssc = new StreamingContext(sparkConf, Milliseconds(200))
val stream = withClue("Error creating direct stream") {
KafkaUtils.createDirectStream[String, String](
ssc,
preferredHosts,
ConsumerStrategies.SubscribePattern[String, String](pat, kafkaParams.asScala, offsets))
}
val allReceived = new ConcurrentLinkedQueue[(String, String)]()
// hold a reference to the current offset ranges, so it can be used downstream
var offsetRanges = Array[OffsetRange]()
val tf = stream.transform { rdd =>
// Get the offset ranges in the RDD
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.map(r => (r.key, r.value))
}
tf.foreachRDD { rdd =>
for (o <- offsetRanges) {
logInfo(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
// For each partition, get size of the range in the partition,
// and the number of items in the partition
val off = offsetRanges(i)
val all = iter.toSeq
val partSize = all.size
val rangeSize = off.untilOffset - off.fromOffset
Iterator((partSize, rangeSize))
}.collect
// Verify whether number of elements in each partition
// matches with the corresponding offset range
collected.foreach { case (partSize, rangeSize) =>
assert(partSize === rangeSize, "offset ranges are wrong")
}
}
stream.foreachRDD { rdd =>
allReceived.addAll(Arrays.asList(rdd.map(r => (r.key, r.value)).collect(): _*))
}
ssc.start()
eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
assert(allReceived.size === expectedTotal,
"didn't get expected number of messages, messages:\n" +
allReceived.asScala.mkString("\n"))
}
ssc.stop()
}
test("receiving from largest starting offset") {
val topic = "latest"
val topicPartition = new TopicPartition(topic, 0)
......@@ -228,6 +295,7 @@ class DirectKafkaStreamSuite
kc.close()
// Setup context and kafka stream with largest offset
kafkaParams.put("auto.offset.reset", "none")
ssc = new StreamingContext(sparkConf, Milliseconds(200))
val stream = withClue("Error creating direct stream") {
val s = new DirectKafkaInputDStream[String, String](
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment