From fd6e8f0e2269a2e7f24f79d5c2041816ea308c86 Mon Sep 17 00:00:00 2001
From: cody koeninger <cody@koeninger.org>
Date: Fri, 8 Jul 2016 17:47:58 -0700
Subject: [PATCH] [SPARK-13569][STREAMING][KAFKA] pattern based topic
 subscription

## What changes were proposed in this pull request?
Allow for kafka topic subscriptions based on a regex pattern.

## How was this patch tested?
Unit tests, manual tests

Author: cody koeninger <cody@koeninger.org>

Closes #14026 from koeninger/SPARK-13569.
---
 .../streaming/kafka010/ConsumerStrategy.scala | 178 +++++++++++++++++-
 .../kafka010/JavaConsumerStrategySuite.java   |  15 ++
 .../kafka010/DirectKafkaStreamSuite.scala     |  74 +++++++-
 3 files changed, 258 insertions(+), 9 deletions(-)

diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
index 70c3f1a98d..60255fc655 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
@@ -22,10 +22,11 @@ import java.{ lang => jl, util => ju }
 import scala.collection.JavaConverters._
 
 import org.apache.kafka.clients.consumer._
+import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.annotation.Experimental
-
+import org.apache.spark.internal.Logging
 
 /**
  * :: Experimental ::
@@ -47,7 +48,9 @@ abstract class ConsumerStrategy[K, V] {
 
   /**
    * Must return a fully configured Kafka Consumer, including subscribed or assigned topics.
+   * See <a href="http://kafka.apache.org/documentation.html#newconsumerapi">Kafka docs</a>.
    * This consumer will be used on the driver to query for offsets only, not messages.
+   * The consumer must be returned in a state that it is safe to call poll(0) on.
    * @param currentOffsets A map from TopicPartition to offset, indicating how far the driver
    * has successfully read.  Will be empty on initial start, possibly non-empty on restart from
    * checkpoint.
@@ -72,15 +75,83 @@ private case class Subscribe[K, V](
     topics: ju.Collection[jl.String],
     kafkaParams: ju.Map[String, Object],
     offsets: ju.Map[TopicPartition, jl.Long]
-  ) extends ConsumerStrategy[K, V] {
+  ) extends ConsumerStrategy[K, V] with Logging {
 
   def executorKafkaParams: ju.Map[String, Object] = kafkaParams
 
   def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
     val consumer = new KafkaConsumer[K, V](kafkaParams)
     consumer.subscribe(topics)
-    if (currentOffsets.isEmpty) {
-      offsets.asScala.foreach { case (topicPartition, offset) =>
+    val toSeek = if (currentOffsets.isEmpty) {
+      offsets
+    } else {
+      currentOffsets
+    }
+    if (!toSeek.isEmpty) {
+      // work around KAFKA-3370 when reset is none
+      // poll will throw if no position, i.e. auto offset reset none and no explicit position
+      // but cant seek to a position before poll, because poll is what gets subscription partitions
+      // So, poll, suppress the first exception, then seek
+      val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
+      val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase == "NONE"
+      try {
+        consumer.poll(0)
+      } catch {
+        case x: NoOffsetForPartitionException if shouldSuppress =>
+          logWarning("Catching NoOffsetForPartitionException since " +
+            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none.  See KAFKA-3370")
+      }
+      toSeek.asScala.foreach { case (topicPartition, offset) =>
+          consumer.seek(topicPartition, offset)
+      }
+    }
+
+    consumer
+  }
+}
+
+/**
+ * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
+ * The pattern matching will be done periodically against topics existing at the time of check.
+ * @param pattern pattern to subscribe to
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
+ * configuration parameters</a> to be used on driver. The same params will be used on executors,
+ * with minor automatic modifications applied.
+ *  Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsets: offsets to begin at on initial startup.  If no offset is given for a
+ * TopicPartition, the committed offset (if applicable) or kafka param
+ * auto.offset.reset will be used.
+ */
+private case class SubscribePattern[K, V](
+    pattern: ju.regex.Pattern,
+    kafkaParams: ju.Map[String, Object],
+    offsets: ju.Map[TopicPartition, jl.Long]
+  ) extends ConsumerStrategy[K, V] with Logging {
+
+  def executorKafkaParams: ju.Map[String, Object] = kafkaParams
+
+  def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
+    val consumer = new KafkaConsumer[K, V](kafkaParams)
+    consumer.subscribe(pattern, new NoOpConsumerRebalanceListener())
+    val toSeek = if (currentOffsets.isEmpty) {
+      offsets
+    } else {
+      currentOffsets
+    }
+    if (!toSeek.isEmpty) {
+      // work around KAFKA-3370 when reset is none, see explanation in Subscribe above
+      val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
+      val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase == "NONE"
+      try {
+        consumer.poll(0)
+      } catch {
+        case x: NoOffsetForPartitionException if shouldSuppress =>
+          logWarning("Catching NoOffsetForPartitionException since " +
+            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none.  See KAFKA-3370")
+      }
+      toSeek.asScala.foreach { case (topicPartition, offset) =>
           consumer.seek(topicPartition, offset)
       }
     }
@@ -113,8 +184,14 @@ private case class Assign[K, V](
   def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
     val consumer = new KafkaConsumer[K, V](kafkaParams)
     consumer.assign(topicPartitions)
-    if (currentOffsets.isEmpty) {
-      offsets.asScala.foreach { case (topicPartition, offset) =>
+    val toSeek = if (currentOffsets.isEmpty) {
+      offsets
+    } else {
+      currentOffsets
+    }
+    if (!toSeek.isEmpty) {
+      // this doesn't need a KAFKA-3370 workaround, because partitions are known, no poll needed
+      toSeek.asScala.foreach { case (topicPartition, offset) =>
           consumer.seek(topicPartition, offset)
       }
     }
@@ -215,6 +292,95 @@ object ConsumerStrategies {
     new Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, jl.Long]())
   }
 
+  /** :: Experimental ::
+   * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
+   * The pattern matching will be done periodically against topics existing at the time of check.
+   * @param pattern pattern to subscribe to
+   * @param kafkaParams Kafka
+   * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
+   * configuration parameters</a> to be used on driver. The same params will be used on executors,
+   * with minor automatic modifications applied.
+   *  Requires "bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   * @param offsets: offsets to begin at on initial startup.  If no offset is given for a
+   * TopicPartition, the committed offset (if applicable) or kafka param
+   * auto.offset.reset will be used.
+   */
+  @Experimental
+  def SubscribePattern[K, V](
+      pattern: ju.regex.Pattern,
+      kafkaParams: collection.Map[String, Object],
+      offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {
+    new SubscribePattern[K, V](
+      pattern,
+      new ju.HashMap[String, Object](kafkaParams.asJava),
+      new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
+  }
+
+  /** :: Experimental ::
+   * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
+   * The pattern matching will be done periodically against topics existing at the time of check.
+   * @param pattern pattern to subscribe to
+   * @param kafkaParams Kafka
+   * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
+   * configuration parameters</a> to be used on driver. The same params will be used on executors,
+   * with minor automatic modifications applied.
+   *  Requires "bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   */
+  @Experimental
+  def SubscribePattern[K, V](
+      pattern: ju.regex.Pattern,
+      kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {
+    new SubscribePattern[K, V](
+      pattern,
+      new ju.HashMap[String, Object](kafkaParams.asJava),
+      ju.Collections.emptyMap[TopicPartition, jl.Long]())
+  }
+
+  /** :: Experimental ::
+   * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
+   * The pattern matching will be done periodically against topics existing at the time of check.
+   * @param pattern pattern to subscribe to
+   * @param kafkaParams Kafka
+   * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
+   * configuration parameters</a> to be used on driver. The same params will be used on executors,
+   * with minor automatic modifications applied.
+   *  Requires "bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   * @param offsets: offsets to begin at on initial startup.  If no offset is given for a
+   * TopicPartition, the committed offset (if applicable) or kafka param
+   * auto.offset.reset will be used.
+   */
+  @Experimental
+  def SubscribePattern[K, V](
+      pattern: ju.regex.Pattern,
+      kafkaParams: ju.Map[String, Object],
+      offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = {
+    new SubscribePattern[K, V](pattern, kafkaParams, offsets)
+  }
+
+  /** :: Experimental ::
+   * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
+   * The pattern matching will be done periodically against topics existing at the time of check.
+   * @param pattern pattern to subscribe to
+   * @param kafkaParams Kafka
+   * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
+   * configuration parameters</a> to be used on driver. The same params will be used on executors,
+   * with minor automatic modifications applied.
+   *  Requires "bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   */
+  @Experimental
+  def SubscribePattern[K, V](
+      pattern: ju.regex.Pattern,
+      kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
+    new SubscribePattern[K, V](
+      pattern,
+      kafkaParams,
+      ju.Collections.emptyMap[TopicPartition, jl.Long]())
+  }
+
   /**
    *  :: Experimental ::
    * Assign a fixed collection of TopicPartitions
diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
index ac8d64b180..ba57b6beb2 100644
--- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
+++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.kafka010;
 
 import java.io.Serializable;
 import java.util.*;
+import java.util.regex.Pattern;
 
 import scala.collection.JavaConverters;
 
@@ -32,6 +33,7 @@ public class JavaConsumerStrategySuite implements Serializable {
   @Test
   public void testConsumerStrategyConstructors() {
     final String topic1 = "topic1";
+    final Pattern pat = Pattern.compile("top.*");
     final Collection<String> topics = Arrays.asList(topic1);
     final scala.collection.Iterable<String> sTopics =
       JavaConverters.collectionAsScalaIterableConverter(topics).asScala();
@@ -69,6 +71,19 @@ public class JavaConsumerStrategySuite implements Serializable {
       sub1.executorKafkaParams().get("bootstrap.servers"),
       sub3.executorKafkaParams().get("bootstrap.servers"));
 
+    final ConsumerStrategy<String, String> psub1 =
+      ConsumerStrategies.<String, String>SubscribePattern(pat, sKafkaParams, sOffsets);
+    final ConsumerStrategy<String, String> psub2 =
+      ConsumerStrategies.<String, String>SubscribePattern(pat, sKafkaParams);
+    final ConsumerStrategy<String, String> psub3 =
+      ConsumerStrategies.<String, String>SubscribePattern(pat, kafkaParams, offsets);
+    final ConsumerStrategy<String, String> psub4 =
+      ConsumerStrategies.<String, String>SubscribePattern(pat, kafkaParams);
+
+    Assert.assertEquals(
+      psub1.executorKafkaParams().get("bootstrap.servers"),
+      psub3.executorKafkaParams().get("bootstrap.servers"));
+
     final ConsumerStrategy<String, String> asn1 =
       ConsumerStrategies.<String, String>Assign(sParts, sKafkaParams, sOffsets);
     final ConsumerStrategy<String, String> asn2 =
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index 0a53259802..c9e15bcba0 100644
--- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -103,7 +103,9 @@ class DirectKafkaStreamSuite
       kafkaTestUtils.createTopic(t)
       kafkaTestUtils.sendMessages(t, data)
     }
-    val totalSent = data.values.sum * topics.size
+    val offsets = Map(new TopicPartition("basic3", 0) -> 2L)
+    // one topic is starting 2 messages later
+    val expectedTotal = (data.values.sum * topics.size) - 2
     val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
 
     ssc = new StreamingContext(sparkConf, Milliseconds(200))
@@ -111,7 +113,7 @@ class DirectKafkaStreamSuite
       KafkaUtils.createDirectStream[String, String](
         ssc,
         preferredHosts,
-        ConsumerStrategies.Subscribe[String, String](topics, kafkaParams.asScala))
+        ConsumerStrategies.Subscribe[String, String](topics, kafkaParams.asScala, offsets))
     }
     val allReceived = new ConcurrentLinkedQueue[(String, String)]()
 
@@ -149,13 +151,78 @@ class DirectKafkaStreamSuite
     }
     ssc.start()
     eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
-      assert(allReceived.size === totalSent,
+      assert(allReceived.size === expectedTotal,
         "didn't get expected number of messages, messages:\n" +
           allReceived.asScala.mkString("\n"))
     }
     ssc.stop()
   }
 
+  test("pattern based subscription") {
+    val topics = List("pat1", "pat2", "advanced3")
+    // Should match 2 out of 3 topics
+    val pat = """pat\d""".r.pattern
+    val data = Map("a" -> 7, "b" -> 9)
+    topics.foreach { t =>
+      kafkaTestUtils.createTopic(t)
+      kafkaTestUtils.sendMessages(t, data)
+    }
+    val offsets = Map(new TopicPartition("pat2", 0) -> 3L)
+    // 2 matching topics, one of which starts 3 messages later
+    val expectedTotal = (data.values.sum * 2) - 3
+    val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
+
+    ssc = new StreamingContext(sparkConf, Milliseconds(200))
+    val stream = withClue("Error creating direct stream") {
+      KafkaUtils.createDirectStream[String, String](
+        ssc,
+        preferredHosts,
+        ConsumerStrategies.SubscribePattern[String, String](pat, kafkaParams.asScala, offsets))
+    }
+    val allReceived = new ConcurrentLinkedQueue[(String, String)]()
+
+    // hold a reference to the current offset ranges, so it can be used downstream
+    var offsetRanges = Array[OffsetRange]()
+    val tf = stream.transform { rdd =>
+      // Get the offset ranges in the RDD
+      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+      rdd.map(r => (r.key, r.value))
+    }
+
+    tf.foreachRDD { rdd =>
+      for (o <- offsetRanges) {
+        logInfo(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
+      }
+      val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
+      // For each partition, get size of the range in the partition,
+      // and the number of items in the partition
+        val off = offsetRanges(i)
+        val all = iter.toSeq
+        val partSize = all.size
+        val rangeSize = off.untilOffset - off.fromOffset
+        Iterator((partSize, rangeSize))
+      }.collect
+
+      // Verify whether number of elements in each partition
+      // matches with the corresponding offset range
+      collected.foreach { case (partSize, rangeSize) =>
+        assert(partSize === rangeSize, "offset ranges are wrong")
+      }
+    }
+
+    stream.foreachRDD { rdd =>
+      allReceived.addAll(Arrays.asList(rdd.map(r => (r.key, r.value)).collect(): _*))
+    }
+    ssc.start()
+    eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
+      assert(allReceived.size === expectedTotal,
+        "didn't get expected number of messages, messages:\n" +
+          allReceived.asScala.mkString("\n"))
+    }
+    ssc.stop()
+  }
+
+
   test("receiving from largest starting offset") {
     val topic = "latest"
     val topicPartition = new TopicPartition(topic, 0)
@@ -228,6 +295,7 @@ class DirectKafkaStreamSuite
     kc.close()
 
     // Setup context and kafka stream with largest offset
+    kafkaParams.put("auto.offset.reset", "none")
     ssc = new StreamingContext(sparkConf, Milliseconds(200))
     val stream = withClue("Error creating direct stream") {
       val s = new DirectKafkaInputDStream[String, String](
-- 
GitLab