From b0c0021953826bccaee818a54afc44e8bdfa8572 Mon Sep 17 00:00:00 2001
From: cody koeninger <cody@koeninger.org>
Date: Wed, 4 Feb 2015 12:06:34 -0800
Subject: [PATCH] [SPARK-4964] [Streaming] Exactly-once semantics for Kafka

Author: cody koeninger <cody@koeninger.org>

Closes #3798 from koeninger/kafkaRdd and squashes the following commits:

1dc2941 [cody koeninger] [SPARK-4964] silence ConsumerConfig warnings about broker connection props
59e29f6 [cody koeninger] [SPARK-4964] settle on "Direct" as a naming convention for the new stream
8c31855 [cody koeninger] [SPARK-4964] remove HasOffsetRanges interface from return types
0df3ebe [cody koeninger] [SPARK-4964] add comments per pwendell / dibbhatt
8991017 [cody koeninger] [SPARK-4964] formatting
825110f [cody koeninger] [SPARK-4964] rename stuff per TD
4354bce [cody koeninger] [SPARK-4964] per td, remove java interfaces, replace with final classes, corresponding changes to KafkaRDD constructor and checkpointing
9adaa0a [cody koeninger] [SPARK-4964] formatting
0090553 [cody koeninger] [SPARK-4964] javafication of interfaces
9a838c2 [cody koeninger] [SPARK-4964] code cleanup, add more tests
2b340d8 [cody koeninger] [SPARK-4964] refactor per TD feedback
80fd6ae [cody koeninger] [SPARK-4964] Rename createExactlyOnceStream so it isnt over-promising, change doc
99d2eba [cody koeninger] [SPARK-4964] Reduce level of nesting.  If beginning is past end, its actually an error (may happen if Kafka topic was deleted and recreated)
19406cc [cody koeninger] Merge branch 'master' of https://github.com/apache/spark into kafkaRdd
2e67117 [cody koeninger] [SPARK-4964] one potential way of hiding most of the implementation, while still allowing access to offsets (but not subclassing)
bb80bbe [cody koeninger] [SPARK-4964] scalastyle line length
d4a7cf7 [cody koeninger] [SPARK-4964] allow for use cases that need to override compute for custom kafka dstreams
c1bd6d9 [cody koeninger] [SPARK-4964] use newly available attemptNumber for correct retry behavior
548d529 [cody koeninger] Merge branch 'master' of https://github.com/apache/spark into kafkaRdd
0458e4e [cody koeninger] [SPARK-4964] recovery of generated rdds from checkpoint
e86317b [cody koeninger] [SPARK-4964] try seed brokers in random order to spread metadata requests
e93eb72 [cody koeninger] [SPARK-4964] refactor to add preferredLocations.  depends on SPARK-4014
356c7cc [cody koeninger] [SPARK-4964] code cleanup per helena
adf99a6 [cody koeninger] [SPARK-4964] fix serialization issues for checkpointing
1d50749 [cody koeninger] [SPARK-4964] code cleanup per tdas
8bfd6c0 [cody koeninger] [SPARK-4964] configure rate limiting via spark.streaming.receiver.maxRate
e09045b [cody koeninger] [SPARK-4964] add foreachPartitionWithIndex, to avoid doing equivalent map + empty foreach boilerplate
cac63ee [cody koeninger] additional testing, fix fencepost error
37d3053 [cody koeninger] make KafkaRDDPartition available to users so offsets can be committed per partition
bcca8a4 [cody koeninger] Merge branch 'master' of https://github.com/apache/spark into kafkaRdd
6bf14f2 [cody koeninger] first attempt at a Kafka dstream that allows for exactly-once semantics
326ff3c [cody koeninger] add some tests
38bb727 [cody koeninger] give easy access to the parameters of a KafkaRDD
979da25 [cody koeninger] dont allow empty leader offsets to be returned
8d7de4a [cody koeninger] make sure leader offsets can be found even for leaders that arent in the seed brokers
4b078bf [cody koeninger] differentiate between leader and consumer offsets in error message
3c2a96a [cody koeninger] fix scalastyle errors
29c6b43 [cody koeninger] cleanup logging
783b477 [cody koeninger] update tests for kafka 8.1.1
7d050bc [cody koeninger] methods to set consumer offsets and get topic metadata, switch back to inclusive start / exclusive end to match typical kafka consumer behavior
ce91c59 [cody koeninger] method to get consumer offsets, explicit error handling
4dafd1b [cody koeninger] method to get leader offsets, switch rdd bound to being exclusive start, inclusive end to match offsets typically returned from cluster
0b94b33 [cody koeninger] use dropWhile rather than filter to trim beginning of fetch response
1d70625 [cody koeninger] WIP on kafka cluster
76913e2 [cody koeninger] Batch oriented kafka rdd, WIP. todo: cluster metadata / finding leader
---
 external/kafka/pom.xml                        |   2 +-
 .../kafka/DirectKafkaInputDStream.scala       | 162 ++++++++
 .../spark/streaming/kafka/KafkaCluster.scala  | 369 ++++++++++++++++++
 .../spark/streaming/kafka/KafkaRDD.scala      | 224 +++++++++++
 .../streaming/kafka/KafkaRDDPartition.scala   |  59 +++
 .../spark/streaming/kafka/KafkaUtils.scala    | 178 ++++++++-
 .../apache/spark/streaming/kafka/Leader.scala |  46 +++
 .../spark/streaming/kafka/OffsetRange.scala   |  70 ++++
 .../streaming/kafka/KafkaClusterSuite.scala   |  73 ++++
 .../kafka/KafkaDirectStreamSuite.scala        |  92 +++++
 .../spark/streaming/kafka/KafkaRDDSuite.scala |  99 +++++
 .../streaming/kafka/KafkaStreamSuite.scala    |   8 +-
 12 files changed, 1376 insertions(+), 6 deletions(-)
 create mode 100644 external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 create mode 100644 external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
 create mode 100644 external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
 create mode 100644 external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
 create mode 100644 external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala
 create mode 100644 external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
 create mode 100644 external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
 create mode 100644 external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala
 create mode 100644 external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala

diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml
index b29b050965..af96138d79 100644
--- a/external/kafka/pom.xml
+++ b/external/kafka/pom.xml
@@ -44,7 +44,7 @@
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_${scala.binary.version}</artifactId>
-      <version>0.8.0</version>
+      <version>0.8.1.1</version>
       <exclusions>
         <exclusion>
           <groupId>com.sun.jmx</groupId>
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
new file mode 100644
index 0000000000..c7bca43eb8
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+import kafka.serializer.Decoder
+
+import org.apache.spark.{Logging, SparkException}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+
+/**
+ *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * Starting offsets are specified in advance,
+ * and this DStream is not responsible for committing offsets,
+ * so that you can control exactly-once semantics.
+ * For an easy interface to Kafka-managed offsets,
+ *  see {@link org.apache.spark.streaming.kafka.KafkaCluster}
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>.
+ *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
+ *  starting point of the stream
+ * @param messageHandler function for translating each message into the desired type
+ * @param maxRetries maximum number of times in a row to retry getting leaders' offsets
+ */
+private[streaming]
+class DirectKafkaInputDStream[
+  K: ClassTag,
+  V: ClassTag,
+  U <: Decoder[_]: ClassTag,
+  T <: Decoder[_]: ClassTag,
+  R: ClassTag](
+    @transient ssc_ : StreamingContext,
+    val kafkaParams: Map[String, String],
+    val fromOffsets: Map[TopicAndPartition, Long],
+    messageHandler: MessageAndMetadata[K, V] => R
+) extends InputDStream[R](ssc_) with Logging {
+  val maxRetries = context.sparkContext.getConf.getInt(
+    "spark.streaming.kafka.maxRetries", 1)
+
+  protected[streaming] override val checkpointData =
+    new DirectKafkaInputDStreamCheckpointData
+
+  protected val kc = new KafkaCluster(kafkaParams)
+
+  protected val maxMessagesPerPartition: Option[Long] = {
+    val ratePerSec = context.sparkContext.getConf.getInt(
+      "spark.streaming.kafka.maxRatePerPartition", 0)
+    if (ratePerSec > 0) {
+      val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
+      Some((secsPerBatch * ratePerSec).toLong)
+    } else {
+      None
+    }
+  }
+
+  protected var currentOffsets = fromOffsets
+
+  @tailrec
+  protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
+    val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
+    // Either.fold would confuse @tailrec, do it manually
+    if (o.isLeft) {
+      val err = o.left.get.toString
+      if (retries <= 0) {
+        throw new SparkException(err)
+      } else {
+        log.error(err)
+        Thread.sleep(kc.config.refreshLeaderBackoffMs)
+        latestLeaderOffsets(retries - 1)
+      }
+    } else {
+      o.right.get
+    }
+  }
+
+  // limits the maximum number of messages per partition
+  protected def clamp(
+    leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = {
+    maxMessagesPerPartition.map { mmp =>
+      leaderOffsets.map { case (tp, lo) =>
+        tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset))
+      }
+    }.getOrElse(leaderOffsets)
+  }
+
+  override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
+    val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
+    val rdd = KafkaRDD[K, V, U, T, R](
+      context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
+
+    currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
+    Some(rdd)
+  }
+
+  override def start(): Unit = {
+  }
+
+  def stop(): Unit = {
+  }
+
+  private[streaming]
+  class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
+    def batchForTime = data.asInstanceOf[mutable.HashMap[
+      Time, Array[OffsetRange.OffsetRangeTuple]]]
+
+    override def update(time: Time) {
+      batchForTime.clear()
+      generatedRDDs.foreach { kv =>
+        val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].offsetRanges.map(_.toTuple).toArray
+        batchForTime += kv._1 -> a
+      }
+    }
+
+    override def cleanup(time: Time) { }
+
+    override def restore() {
+      // this is assuming that the topics don't change during execution, which is true currently
+      val topics = fromOffsets.keySet
+      val leaders = kc.findLeaders(topics).fold(
+        errs => throw new SparkException(errs.mkString("\n")),
+        ok => ok
+      )
+
+      batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
+          logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
+          generatedRDDs += t -> new KafkaRDD[K, V, U, T, R](
+            context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, messageHandler)
+      }
+    }
+  }
+
+}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
new file mode 100644
index 0000000000..ccc62bfe8f
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.control.NonFatal
+import scala.util.Random
+import scala.collection.mutable.ArrayBuffer
+import java.util.Properties
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+import org.apache.spark.SparkException
+
+/**
+ * Convenience methods for interacting with a Kafka cluster.
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>.
+ *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
+ */
+private[spark]
+class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
+  import KafkaCluster.{Err, LeaderOffset, SimpleConsumerConfig}
+
+  // ConsumerConfig isn't serializable
+  @transient private var _config: SimpleConsumerConfig = null
+
+  def config: SimpleConsumerConfig = this.synchronized {
+    if (_config == null) {
+      _config = SimpleConsumerConfig(kafkaParams)
+    }
+    _config
+  }
+
+  def connect(host: String, port: Int): SimpleConsumer =
+    new SimpleConsumer(host, port, config.socketTimeoutMs,
+      config.socketReceiveBufferBytes, config.clientId)
+
+  def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] =
+    findLeader(topic, partition).right.map(hp => connect(hp._1, hp._2))
+
+  // Metadata api
+  // scalastyle:off
+  // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
+  // scalastyle:on
+
+  def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = {
+    val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
+      0, config.clientId, Seq(topic))
+    val errs = new Err
+    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
+      val resp: TopicMetadataResponse = consumer.send(req)
+      resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata =>
+        tm.partitionsMetadata.find(_.partitionId == partition)
+      }.foreach { pm: PartitionMetadata =>
+        pm.leader.foreach { leader =>
+          return Right((leader.host, leader.port))
+        }
+      }
+    }
+    Left(errs)
+  }
+
+  def findLeaders(
+      topicAndPartitions: Set[TopicAndPartition]
+    ): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
+    val topics = topicAndPartitions.map(_.topic)
+    val response = getPartitionMetadata(topics).right
+    val answer = response.flatMap { tms: Set[TopicMetadata] =>
+      val leaderMap = tms.flatMap { tm: TopicMetadata =>
+        tm.partitionsMetadata.flatMap { pm: PartitionMetadata =>
+          val tp = TopicAndPartition(tm.topic, pm.partitionId)
+          if (topicAndPartitions(tp)) {
+            pm.leader.map { l =>
+              tp -> (l.host -> l.port)
+            }
+          } else {
+            None
+          }
+        }
+      }.toMap
+
+      if (leaderMap.keys.size == topicAndPartitions.size) {
+        Right(leaderMap)
+      } else {
+        val missing = topicAndPartitions.diff(leaderMap.keySet)
+        val err = new Err
+        err.append(new SparkException(s"Couldn't find leaders for ${missing}"))
+        Left(err)
+      }
+    }
+    answer
+  }
+
+  def getPartitions(topics: Set[String]): Either[Err, Set[TopicAndPartition]] = {
+    getPartitionMetadata(topics).right.map { r =>
+      r.flatMap { tm: TopicMetadata =>
+        tm.partitionsMetadata.map { pm: PartitionMetadata =>
+          TopicAndPartition(tm.topic, pm.partitionId)
+        }    
+      }
+    }
+  }
+
+  def getPartitionMetadata(topics: Set[String]): Either[Err, Set[TopicMetadata]] = {
+    val req = TopicMetadataRequest(
+      TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics.toSeq)
+    val errs = new Err
+    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
+      val resp: TopicMetadataResponse = consumer.send(req)
+      // error codes here indicate missing / just created topic,
+      // repeating on a different broker wont be useful
+      return Right(resp.topicsMetadata.toSet)
+    }
+    Left(errs)
+  }
+
+  // Leader offset api
+  // scalastyle:off
+  // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
+  // scalastyle:on
+
+  def getLatestLeaderOffsets(
+      topicAndPartitions: Set[TopicAndPartition]
+    ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
+    getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime)
+
+  def getEarliestLeaderOffsets(
+      topicAndPartitions: Set[TopicAndPartition]
+    ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
+    getLeaderOffsets(topicAndPartitions, OffsetRequest.EarliestTime)
+
+  def getLeaderOffsets(
+      topicAndPartitions: Set[TopicAndPartition],
+      before: Long
+    ): Either[Err, Map[TopicAndPartition, LeaderOffset]] = {
+    getLeaderOffsets(topicAndPartitions, before, 1).right.map { r =>
+      r.map { kv =>
+        // mapValues isnt serializable, see SI-7005
+        kv._1 -> kv._2.head
+      }
+    }
+  }
+
+  private def flip[K, V](m: Map[K, V]): Map[V, Seq[K]] =
+    m.groupBy(_._2).map { kv =>
+      kv._1 -> kv._2.keys.toSeq
+    }
+
+  def getLeaderOffsets(
+      topicAndPartitions: Set[TopicAndPartition],
+      before: Long,
+      maxNumOffsets: Int
+    ): Either[Err, Map[TopicAndPartition, Seq[LeaderOffset]]] = {
+    findLeaders(topicAndPartitions).right.flatMap { tpToLeader =>
+      val leaderToTp: Map[(String, Int), Seq[TopicAndPartition]] = flip(tpToLeader)
+      val leaders = leaderToTp.keys
+      var result = Map[TopicAndPartition, Seq[LeaderOffset]]()
+      val errs = new Err
+      withBrokers(leaders, errs) { consumer =>
+        val partitionsToGetOffsets: Seq[TopicAndPartition] =
+          leaderToTp((consumer.host, consumer.port))
+        val reqMap = partitionsToGetOffsets.map { tp: TopicAndPartition =>
+          tp -> PartitionOffsetRequestInfo(before, maxNumOffsets)
+        }.toMap
+        val req = OffsetRequest(reqMap)
+        val resp = consumer.getOffsetsBefore(req)
+        val respMap = resp.partitionErrorAndOffsets
+        partitionsToGetOffsets.foreach { tp: TopicAndPartition =>
+          respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
+            if (por.error == ErrorMapping.NoError) {
+              if (por.offsets.nonEmpty) {
+                result += tp -> por.offsets.map { off =>
+                  LeaderOffset(consumer.host, consumer.port, off)
+                }
+              } else {
+                errs.append(new SparkException(
+                  s"Empty offsets for ${tp}, is ${before} before log beginning?"))
+              }
+            } else {
+              errs.append(ErrorMapping.exceptionFor(por.error))
+            }
+          }
+        }
+        if (result.keys.size == topicAndPartitions.size) {
+          return Right(result)
+        }
+      }
+      val missing = topicAndPartitions.diff(result.keySet)
+      errs.append(new SparkException(s"Couldn't find leader offsets for ${missing}"))
+      Left(errs)
+    }
+  }
+
+  // Consumer offset api
+  // scalastyle:off
+  // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
+  // scalastyle:on
+
+  /** Requires Kafka >= 0.8.1.1 */
+  def getConsumerOffsets(
+      groupId: String,
+      topicAndPartitions: Set[TopicAndPartition]
+    ): Either[Err, Map[TopicAndPartition, Long]] = {
+    getConsumerOffsetMetadata(groupId, topicAndPartitions).right.map { r =>
+      r.map { kv =>
+        kv._1 -> kv._2.offset
+      }
+    }
+  }
+
+  /** Requires Kafka >= 0.8.1.1 */
+  def getConsumerOffsetMetadata(
+      groupId: String,
+      topicAndPartitions: Set[TopicAndPartition]
+    ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = {
+    var result = Map[TopicAndPartition, OffsetMetadataAndError]()
+    val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq)
+    val errs = new Err
+    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
+      val resp = consumer.fetchOffsets(req)
+      val respMap = resp.requestInfo
+      val needed = topicAndPartitions.diff(result.keySet)
+      needed.foreach { tp: TopicAndPartition =>
+        respMap.get(tp).foreach { ome: OffsetMetadataAndError =>
+          if (ome.error == ErrorMapping.NoError) {
+            result += tp -> ome
+          } else {
+            errs.append(ErrorMapping.exceptionFor(ome.error))
+          }
+        }
+      }
+      if (result.keys.size == topicAndPartitions.size) {
+        return Right(result)
+      }
+    }
+    val missing = topicAndPartitions.diff(result.keySet)
+    errs.append(new SparkException(s"Couldn't find consumer offsets for ${missing}"))
+    Left(errs)
+  }
+
+  /** Requires Kafka >= 0.8.1.1 */
+  def setConsumerOffsets(
+      groupId: String,
+      offsets: Map[TopicAndPartition, Long]
+    ): Either[Err, Map[TopicAndPartition, Short]] = {
+    setConsumerOffsetMetadata(groupId, offsets.map { kv =>
+      kv._1 -> OffsetMetadataAndError(kv._2)
+    })
+  }
+
+  /** Requires Kafka >= 0.8.1.1 */
+  def setConsumerOffsetMetadata(
+      groupId: String,
+      metadata: Map[TopicAndPartition, OffsetMetadataAndError]
+    ): Either[Err, Map[TopicAndPartition, Short]] = {
+    var result = Map[TopicAndPartition, Short]()
+    val req = OffsetCommitRequest(groupId, metadata)
+    val errs = new Err
+    val topicAndPartitions = metadata.keySet
+    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
+      val resp = consumer.commitOffsets(req)
+      val respMap = resp.requestInfo
+      val needed = topicAndPartitions.diff(result.keySet)
+      needed.foreach { tp: TopicAndPartition =>
+        respMap.get(tp).foreach { err: Short =>
+          if (err == ErrorMapping.NoError) {
+            result += tp -> err
+          } else {
+            errs.append(ErrorMapping.exceptionFor(err))
+          }
+        }
+      }
+      if (result.keys.size == topicAndPartitions.size) {
+        return Right(result)
+      }
+    }
+    val missing = topicAndPartitions.diff(result.keySet)
+    errs.append(new SparkException(s"Couldn't set offsets for ${missing}"))
+    Left(errs)
+  }
+
+  // Try a call against potentially multiple brokers, accumulating errors
+  private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)
+    (fn: SimpleConsumer => Any): Unit = {
+    brokers.foreach { hp =>
+      var consumer: SimpleConsumer = null
+      try {
+        consumer = connect(hp._1, hp._2)
+        fn(consumer)
+      } catch {
+        case NonFatal(e) =>
+          errs.append(e)
+      } finally {
+        if (consumer != null) {
+          consumer.close()
+        }
+      }
+    }
+  }
+}
+
+private[spark]
+object KafkaCluster {
+  type Err = ArrayBuffer[Throwable]
+
+  private[spark]
+  case class LeaderOffset(host: String, port: Int, offset: Long)
+
+  /**
+   * High-level kafka consumers connect to ZK.  ConsumerConfig assumes this use case.
+   * Simple consumers connect directly to brokers, but need many of the same configs.
+   * This subclass won't warn about missing ZK params, or presence of broker params.
+   */
+  private[spark]
+  class SimpleConsumerConfig private(brokers: String, originalProps: Properties)
+      extends ConsumerConfig(originalProps) {
+    val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp =>
+      val hpa = hp.split(":")
+      (hpa(0), hpa(1).toInt)
+    }
+  }
+
+  private[spark]
+  object SimpleConsumerConfig {
+    /**
+     * Make a consumer config without requiring group.id or zookeeper.connect,
+     * since communicating with brokers also needs common settings such as timeout
+     */
+    def apply(kafkaParams: Map[String, String]): SimpleConsumerConfig = {
+      // These keys are from other pre-existing kafka configs for specifying brokers, accept either
+      val brokers = kafkaParams.get("metadata.broker.list")
+        .orElse(kafkaParams.get("bootstrap.servers"))
+        .getOrElse(throw new SparkException(
+          "Must specify metadata.broker.list or bootstrap.servers"))
+
+      val props = new Properties()
+      kafkaParams.foreach { case (key, value) =>
+        // prevent warnings on parameters ConsumerConfig doesn't know about
+        if (key != "metadata.broker.list" && key != "bootstrap.servers") {
+          props.put(key, value)
+        }
+      }
+
+      Seq("zookeeper.connect", "group.id").foreach { s =>
+        if (!props.contains(s)) {
+          props.setProperty(s, "")
+        }
+      }
+
+      new SimpleConsumerConfig(brokers, props)
+    }
+  }
+}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
new file mode 100644
index 0000000000..50bf7cbdb8
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.reflect.{classTag, ClassTag}
+
+import org.apache.spark.{Logging, Partition, SparkContext, SparkException, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.NextIterator
+
+import java.util.Properties
+import kafka.api.{FetchRequestBuilder, FetchResponse}
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+import kafka.message.{MessageAndMetadata, MessageAndOffset}
+import kafka.serializer.Decoder
+import kafka.utils.VerifiableProperties
+
+/**
+ * A batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+ * configuration parameters</a>.
+ *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param batch Each KafkaRDDPartition in the batch corresponds to a
+ *   range of offsets for a given Kafka topic/partition
+ * @param messageHandler function for translating each message into the desired type
+ */
+private[spark]
+class KafkaRDD[
+  K: ClassTag,
+  V: ClassTag,
+  U <: Decoder[_]: ClassTag,
+  T <: Decoder[_]: ClassTag,
+  R: ClassTag] private[spark] (
+    sc: SparkContext,
+    kafkaParams: Map[String, String],
+    val offsetRanges: Array[OffsetRange],
+    leaders: Map[TopicAndPartition, (String, Int)],
+    messageHandler: MessageAndMetadata[K, V] => R
+  ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {
+  override def getPartitions: Array[Partition] = {
+    offsetRanges.zipWithIndex.map { case (o, i) =>
+        val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
+        new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
+    }.toArray
+  }
+
+  override def getPreferredLocations(thePart: Partition): Seq[String] = {
+    val part = thePart.asInstanceOf[KafkaRDDPartition]
+    // TODO is additional hostname resolution necessary here
+    Seq(part.host)
+  }
+
+  private def errBeginAfterEnd(part: KafkaRDDPartition): String =
+    s"Beginning offset ${part.fromOffset} is after the ending offset ${part.untilOffset} " +
+      s"for topic ${part.topic} partition ${part.partition}. " +
+      "You either provided an invalid fromOffset, or the Kafka topic has been damaged"
+
+  private def errRanOutBeforeEnd(part: KafkaRDDPartition): String =
+    s"Ran out of messages before reaching ending offset ${part.untilOffset} " +
+    s"for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}." +
+    " This should not happen, and indicates that messages may have been lost"
+
+  private def errOvershotEnd(itemOffset: Long, part: KafkaRDDPartition): String =
+    s"Got ${itemOffset} > ending offset ${part.untilOffset} " +
+    s"for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}." +
+    " This should not happen, and indicates a message may have been skipped"
+
+  override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
+    val part = thePart.asInstanceOf[KafkaRDDPartition]
+    assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
+    if (part.fromOffset == part.untilOffset) {
+      log.warn("Beginning offset ${part.fromOffset} is the same as ending offset " +
+        s"skipping ${part.topic} ${part.partition}")
+      Iterator.empty
+    } else {
+      new KafkaRDDIterator(part, context)
+    }
+  }
+
+  private class KafkaRDDIterator(
+      part: KafkaRDDPartition,
+      context: TaskContext) extends NextIterator[R] {
+
+    context.addTaskCompletionListener{ context => closeIfNeeded() }
+
+    log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +
+      s"offsets ${part.fromOffset} -> ${part.untilOffset}")
+
+    val kc = new KafkaCluster(kafkaParams)
+    val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
+      .newInstance(kc.config.props)
+      .asInstanceOf[Decoder[K]]
+    val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
+      .newInstance(kc.config.props)
+      .asInstanceOf[Decoder[V]]
+    val consumer = connectLeader
+    var requestOffset = part.fromOffset
+    var iter: Iterator[MessageAndOffset] = null
+
+    // The idea is to use the provided preferred host, except on task retry atttempts,
+    // to minimize number of kafka metadata requests
+    private def connectLeader: SimpleConsumer = {
+      if (context.attemptNumber > 0) {
+        kc.connectLeader(part.topic, part.partition).fold(
+          errs => throw new SparkException(
+            s"Couldn't connect to leader for topic ${part.topic} ${part.partition}: " +
+              errs.mkString("\n")),
+          consumer => consumer
+        )
+      } else {
+        kc.connect(part.host, part.port)
+      }
+    }
+
+    private def handleFetchErr(resp: FetchResponse) {
+      if (resp.hasError) {
+        val err = resp.errorCode(part.topic, part.partition)
+        if (err == ErrorMapping.LeaderNotAvailableCode ||
+          err == ErrorMapping.NotLeaderForPartitionCode) {
+          log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, " +
+            s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
+          Thread.sleep(kc.config.refreshLeaderBackoffMs)
+        }
+        // Let normal rdd retry sort out reconnect attempts
+        throw ErrorMapping.exceptionFor(err)
+      }
+    }
+
+    private def fetchBatch: Iterator[MessageAndOffset] = {
+      val req = new FetchRequestBuilder()
+        .addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)
+        .build()
+      val resp = consumer.fetch(req)
+      handleFetchErr(resp)
+      // kafka may return a batch that starts before the requested offset
+      resp.messageSet(part.topic, part.partition)
+        .iterator
+        .dropWhile(_.offset < requestOffset)
+    }
+
+    override def close() = consumer.close()
+
+    override def getNext(): R = {
+      if (iter == null || !iter.hasNext) {
+        iter = fetchBatch
+      }
+      if (!iter.hasNext) {
+        assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
+        finished = true
+        null.asInstanceOf[R]
+      } else {
+        val item = iter.next()
+        if (item.offset >= part.untilOffset) {
+          assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part))
+          finished = true
+          null.asInstanceOf[R]
+        } else {
+          requestOffset = item.nextOffset
+          messageHandler(new MessageAndMetadata(
+            part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
+        }
+      }
+    }
+  }
+}
+
+private[spark]
+object KafkaRDD {
+  import KafkaCluster.LeaderOffset
+
+  /**
+   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+   * configuration parameters</a>.
+   *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
+   *  starting point of the batch
+   * @param untilOffsets per-topic/partition Kafka offsets defining the (exclusive)
+   *  ending point of the batch
+   * @param messageHandler function for translating each message into the desired type
+   */
+  def apply[
+    K: ClassTag,
+    V: ClassTag,
+    U <: Decoder[_]: ClassTag,
+    T <: Decoder[_]: ClassTag,
+    R: ClassTag](
+      sc: SparkContext,
+      kafkaParams: Map[String, String],
+      fromOffsets: Map[TopicAndPartition, Long],
+      untilOffsets: Map[TopicAndPartition, LeaderOffset],
+      messageHandler: MessageAndMetadata[K, V] => R
+  ): KafkaRDD[K, V, U, T, R] = {
+    val leaders = untilOffsets.map { case (tp, lo) =>
+        tp -> (lo.host, lo.port)
+    }.toMap
+
+    val offsetRanges = fromOffsets.map { case (tp, fo) =>
+        val uo = untilOffsets(tp)
+        OffsetRange(tp.topic, tp.partition, fo, uo.offset)
+    }.toArray
+
+    new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaders, messageHandler)
+  }
+}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
new file mode 100644
index 0000000000..36372e08f6
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import org.apache.spark.Partition
+
+/** @param topic kafka topic name
+  * @param partition kafka partition id
+  * @param fromOffset inclusive starting offset
+  * @param untilOffset exclusive ending offset
+  * @param host preferred kafka host, i.e. the leader at the time the rdd was created
+  * @param port preferred kafka host's port
+  */
+private[spark]
+class KafkaRDDPartition(
+  val index: Int,
+  val topic: String,
+  val partition: Int,
+  val fromOffset: Long,
+  val untilOffset: Long,
+  val host: String,
+  val port: Int
+) extends Partition
+
+private[spark]
+object KafkaRDDPartition {
+  def apply(
+    index: Int,
+    topic: String,
+    partition: Int,
+    fromOffset: Long,
+    untilOffset: Long,
+    host: String,
+    port: Int
+  ): KafkaRDDPartition = new KafkaRDDPartition(
+    index,
+    topic,
+    partition,
+    fromOffset,
+    untilOffset,
+    host,
+    port
+  )
+}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index df725f0c65..f8aa6c5c62 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -23,12 +23,18 @@ import java.util.{Map => JMap}
 import scala.reflect.ClassTag
 import scala.collection.JavaConversions._
 
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
 import kafka.serializer.{Decoder, StringDecoder}
 
+
+import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
 
 object KafkaUtils {
   /**
@@ -144,4 +150,174 @@ object KafkaUtils {
     createStream[K, V, U, T](
       jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
   }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+   * configuration parameters</a>.
+   *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[
+    K: ClassTag,
+    V: ClassTag,
+    U <: Decoder[_]: ClassTag,
+    T <: Decoder[_]: ClassTag] (
+      sc: SparkContext,
+      kafkaParams: Map[String, String],
+      offsetRanges: Array[OffsetRange]
+  ): RDD[(K, V)] = {
+    val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
+    val kc = new KafkaCluster(kafkaParams)
+    val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet
+    val leaders = kc.findLeaders(topics).fold(
+      errs => throw new SparkException(errs.mkString("\n")),
+      ok => ok
+    )
+    new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
+  }
+
+  /** A batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+   * configuration parameters</a>.
+   *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka leaders for each offset range in batch
+   * @param messageHandler function for translating each message into the desired type
+   */
+  @Experimental
+  def createRDD[
+    K: ClassTag,
+    V: ClassTag,
+    U <: Decoder[_]: ClassTag,
+    T <: Decoder[_]: ClassTag,
+    R: ClassTag] (
+      sc: SparkContext,
+      kafkaParams: Map[String, String],
+      offsetRanges: Array[OffsetRange],
+      leaders: Array[Leader],
+      messageHandler: MessageAndMetadata[K, V] => R
+  ): RDD[R] = {
+
+    val leaderMap = leaders
+      .map(l => TopicAndPartition(l.topic, l.partition) -> (l.host, l.port))
+      .toMap
+    new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
+  }
+
+  /**
+   * This stream can guarantee that each message from Kafka is included in transformations
+   * (as opposed to output actions) exactly once, even in most failure situations.
+   *
+   * Points to note:
+   *
+   * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them
+   * as the fromOffsets parameter on restart.
+   * Kafka must have sufficient log retention to obtain messages after failure.
+   *
+   * Getting offsets from the stream - see programming guide
+   *
+.  * Zookeeper - This does not use Zookeeper to store offsets.  For interop with Kafka monitors
+   * that depend on Zookeeper, you must store offsets in ZK yourself.
+   *
+   * End-to-end semantics - This does not guarantee that any output operation will push each record
+   * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and
+   * outputting exactly once), you have to either ensure that the output operation is
+   * idempotent, or transactionally store offsets with the output. See the programming guide for
+   * more details.
+   *
+   * @param ssc StreamingContext object
+   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+   * configuration parameters</a>.
+   *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param messageHandler function for translating each message into the desired type
+   * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
+   *  starting point of the stream
+   */
+  @Experimental
+  def createDirectStream[
+    K: ClassTag,
+    V: ClassTag,
+    U <: Decoder[_]: ClassTag,
+    T <: Decoder[_]: ClassTag,
+    R: ClassTag] (
+      ssc: StreamingContext,
+      kafkaParams: Map[String, String],
+      fromOffsets: Map[TopicAndPartition, Long],
+      messageHandler: MessageAndMetadata[K, V] => R
+  ): InputDStream[R] = {
+    new DirectKafkaInputDStream[K, V, U, T, R](
+      ssc, kafkaParams, fromOffsets, messageHandler)
+  }
+
+  /**
+   * This stream can guarantee that each message from Kafka is included in transformations
+   * (as opposed to output actions) exactly once, even in most failure situations.
+   *
+   * Points to note:
+   *
+   * Failure Recovery - You must checkpoint this stream.
+   * Kafka must have sufficient log retention to obtain messages after failure.
+   *
+   * Getting offsets from the stream - see programming guide
+   *
+.  * Zookeeper - This does not use Zookeeper to store offsets.  For interop with Kafka monitors
+   * that depend on Zookeeper, you must store offsets in ZK yourself.
+   *
+   * End-to-end semantics - This does not guarantee that any output operation will push each record
+   * exactly once. To ensure end-to-end exactly-once semantics (that is, receiving exactly once and
+   * outputting exactly once), you have to ensure that the output operation is idempotent.
+   *
+   * @param ssc StreamingContext object
+   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
+   * configuration parameters</a>.
+   *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   *   If starting without a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
+   *   to determine where the stream starts (defaults to "largest")
+   * @param topics names of the topics to consume
+   */
+  @Experimental
+  def createDirectStream[
+    K: ClassTag,
+    V: ClassTag,
+    U <: Decoder[_]: ClassTag,
+    T <: Decoder[_]: ClassTag] (
+      ssc: StreamingContext,
+      kafkaParams: Map[String, String],
+      topics: Set[String]
+  ): InputDStream[(K, V)] = {
+    val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
+    val kc = new KafkaCluster(kafkaParams)
+    val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
+
+    (for {
+      topicPartitions <- kc.getPartitions(topics).right
+      leaderOffsets <- (if (reset == Some("smallest")) {
+        kc.getEarliestLeaderOffsets(topicPartitions)
+      } else {
+        kc.getLatestLeaderOffsets(topicPartitions)
+      }).right
+    } yield {
+      val fromOffsets = leaderOffsets.map { case (tp, lo) =>
+          (tp, lo.offset)
+      }
+      new DirectKafkaInputDStream[K, V, U, T, (K, V)](
+        ssc, kafkaParams, fromOffsets, messageHandler)
+    }).fold(
+      errs => throw new SparkException(errs.mkString("\n")),
+      ok => ok
+    )
+  }
 }
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala
new file mode 100644
index 0000000000..3454d92e72
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import kafka.common.TopicAndPartition
+
+/** Host info for the leader of a Kafka TopicAndPartition */
+final class Leader private(
+    /** kafka topic name */
+    val topic: String,
+    /** kafka partition id */
+    val partition: Int,
+    /** kafka hostname */
+    val host: String,
+    /** kafka host's port */
+    val port: Int) extends Serializable
+
+object Leader {
+  def create(topic: String, partition: Int, host: String, port: Int): Leader =
+    new Leader(topic, partition, host, port)
+
+  def create(topicAndPartition: TopicAndPartition, host: String, port: Int): Leader =
+    new Leader(topicAndPartition.topic, topicAndPartition.partition, host, port)
+
+  def apply(topic: String, partition: Int, host: String, port: Int): Leader =
+    new Leader(topic, partition, host, port)
+
+  def apply(topicAndPartition: TopicAndPartition, host: String, port: Int): Leader =
+    new Leader(topicAndPartition.topic, topicAndPartition.partition, host, port)
+
+}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
new file mode 100644
index 0000000000..334c12e462
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import kafka.common.TopicAndPartition
+
+/** Something that has a collection of OffsetRanges */
+trait HasOffsetRanges {
+  def offsetRanges: Array[OffsetRange]
+}
+
+/** Represents a range of offsets from a single Kafka TopicAndPartition */
+final class OffsetRange private(
+    /** kafka topic name */
+    val topic: String,
+    /** kafka partition id */
+    val partition: Int,
+    /** inclusive starting offset */
+    val fromOffset: Long,
+    /** exclusive ending offset */
+    val untilOffset: Long) extends Serializable {
+  import OffsetRange.OffsetRangeTuple
+
+  /** this is to avoid ClassNotFoundException during checkpoint restore */
+  private[streaming]
+  def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset)
+}
+
+object OffsetRange {
+  def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
+    new OffsetRange(topic, partition, fromOffset, untilOffset)
+
+  def create(
+      topicAndPartition: TopicAndPartition,
+      fromOffset: Long,
+      untilOffset: Long): OffsetRange =
+    new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, fromOffset, untilOffset)
+
+  def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
+    new OffsetRange(topic, partition, fromOffset, untilOffset)
+
+  def apply(
+      topicAndPartition: TopicAndPartition,
+      fromOffset: Long,
+      untilOffset: Long): OffsetRange =
+    new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, fromOffset, untilOffset)
+
+  /** this is to avoid ClassNotFoundException during checkpoint restore */
+  private[spark]
+  type OffsetRangeTuple = (String, Int, Long, Long)
+
+  private[streaming]
+  def apply(t: OffsetRangeTuple) =
+    new OffsetRange(t._1, t._2, t._3, t._4)
+}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
new file mode 100644
index 0000000000..e57c8f6987
--- /dev/null
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfter
+import kafka.common.TopicAndPartition
+
+class KafkaClusterSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
+  val brokerHost = "localhost"
+
+  val kafkaParams = Map("metadata.broker.list" -> s"$brokerHost:$brokerPort")
+
+  val kc = new KafkaCluster(kafkaParams)
+
+  val topic = "kcsuitetopic" + Random.nextInt(10000)
+
+  val topicAndPartition = TopicAndPartition(topic, 0)
+
+  before {
+    setupKafka()
+    createTopic(topic)
+    produceAndSendMessage(topic, Map("a" -> 1))
+  }
+
+  after {
+    tearDownKafka()
+  }
+
+  test("metadata apis") {
+    val leader = kc.findLeaders(Set(topicAndPartition)).right.get
+    assert(leader(topicAndPartition) === (brokerHost, brokerPort), "didn't get leader")
+
+    val parts = kc.getPartitions(Set(topic)).right.get
+    assert(parts(topicAndPartition), "didn't get partitions")
+  }
+
+  test("leader offset apis") {
+    val earliest = kc.getEarliestLeaderOffsets(Set(topicAndPartition)).right.get
+    assert(earliest(topicAndPartition).offset === 0, "didn't get earliest")
+
+    val latest = kc.getLatestLeaderOffsets(Set(topicAndPartition)).right.get
+    assert(latest(topicAndPartition).offset === 1, "didn't get latest")
+  }
+
+  test("consumer offset apis") {
+    val group = "kcsuitegroup" + Random.nextInt(10000)
+
+    val offset = Random.nextInt(10000)
+
+    val set = kc.setConsumerOffsets(group, Map(topicAndPartition -> offset))
+    assert(set.isRight, "didn't set consumer offsets")
+
+    val get = kc.getConsumerOffsets(group, Set(topicAndPartition)).right.get
+    assert(get(topicAndPartition) === offset, "didn't get consumer offsets")
+  }
+}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala
new file mode 100644
index 0000000000..0891ce344f
--- /dev/null
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaDirectStreamSuite.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.Random
+import scala.concurrent.duration._
+
+import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Eventually
+
+import kafka.serializer.StringDecoder
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+
+class KafkaDirectStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
+  val sparkConf = new SparkConf()
+    .setMaster("local[4]")
+    .setAppName(this.getClass.getSimpleName)
+
+  val brokerHost = "localhost"
+
+  val kafkaParams = Map(
+    "metadata.broker.list" -> s"$brokerHost:$brokerPort",
+    "auto.offset.reset" -> "smallest"
+  )
+
+  var ssc: StreamingContext = _
+
+  before {
+    setupKafka()
+
+    ssc = new StreamingContext(sparkConf, Milliseconds(500))
+  }
+
+  after {
+    if (ssc != null) {
+      ssc.stop()
+    }
+    tearDownKafka()
+  }
+
+  test("multi topic stream") {
+    val topics = Set("newA", "newB")
+    val data = Map("a" -> 7, "b" -> 9)
+    topics.foreach { t =>
+      createTopic(t)
+      produceAndSendMessage(t, data)
+    }
+    val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
+      ssc, kafkaParams, topics)
+    var total = 0L;
+
+    stream.foreachRDD { rdd =>
+      val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+      val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
+        val off = offsets(i)
+        val all = iter.toSeq
+        val partSize = all.size
+        val rangeSize = off.untilOffset - off.fromOffset
+        all.map { _ =>
+          (partSize, rangeSize)
+        }.toIterator
+      }.collect
+      collected.foreach { case (partSize, rangeSize) =>
+          assert(partSize === rangeSize, "offset ranges are wrong")
+      }
+      total += collected.size
+    }
+    ssc.start()
+    eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
+      assert(total === data.values.sum * topics.size, "didn't get all messages")
+    }
+    ssc.stop()
+  }
+}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
new file mode 100644
index 0000000000..9b9e3f5fce
--- /dev/null
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.Random
+
+import kafka.serializer.StringDecoder
+import kafka.common.TopicAndPartition
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark._
+import org.apache.spark.SparkContext._
+
+class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
+  var sc: SparkContext = _
+  before {
+    setupKafka()
+  }
+
+  after {
+    if (sc != null) {
+      sc.stop
+      sc = null
+    }
+    tearDownKafka()
+  }
+
+  test("Kafka RDD") {
+    val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
+    sc = new SparkContext(sparkConf)
+    val topic = "topic1"
+    val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
+    createTopic(topic)
+    produceAndSendMessage(topic, sent)
+
+    val kafkaParams = Map("metadata.broker.list" -> s"localhost:$brokerPort",
+      "group.id" -> s"test-consumer-${Random.nextInt(10000)}")
+
+    val kc = new KafkaCluster(kafkaParams)
+
+    val rdd = getRdd(kc, Set(topic))
+    // this is the "lots of messages" case
+    // make sure we get all of them
+    assert(rdd.isDefined)
+    assert(rdd.get.count === sent.values.sum)
+
+    kc.setConsumerOffsets(
+      kafkaParams("group.id"),
+      rdd.get.offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap)
+
+    val rdd2 = getRdd(kc, Set(topic))
+    val sent2 = Map("d" -> 1)
+    produceAndSendMessage(topic, sent2)
+    // this is the "0 messages" case
+    // make sure we dont get anything, since messages were sent after rdd was defined
+    assert(rdd2.isDefined)
+    assert(rdd2.get.count === 0)
+
+    val rdd3 = getRdd(kc, Set(topic))
+    produceAndSendMessage(topic, Map("extra" -> 22))
+    // this is the "exactly 1 message" case
+    // make sure we get exactly one message, despite there being lots more available
+    assert(rdd3.isDefined)
+    assert(rdd3.get.count === sent2.values.sum)
+
+  }
+
+  // get an rdd from the committed consumer offsets until the latest leader offsets,
+  private def getRdd(kc: KafkaCluster, topics: Set[String]) = {
+    val groupId = kc.kafkaParams("group.id")
+    for {
+      topicPartitions <- kc.getPartitions(topics).right.toOption
+      from <- kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse(
+        kc.getEarliestLeaderOffsets(topicPartitions).right.toOption.map { offs =>
+          offs.map(kv => kv._1 -> kv._2.offset)
+        }
+      )
+      until <- kc.getLatestLeaderOffsets(topicPartitions).right.toOption
+    } yield {
+      KafkaRDD[String, String, StringDecoder, StringDecoder, String](
+        sc, kc.kafkaParams, from, until, mmd => s"${mmd.offset} ${mmd.message}")
+    }
+  }
+}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index 0817c56d8f..f207dc6d4f 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -26,7 +26,7 @@ import scala.concurrent.duration._
 import scala.language.postfixOps
 import scala.util.Random
 
-import kafka.admin.CreateTopicCommand
+import kafka.admin.AdminUtils
 import kafka.common.{KafkaException, TopicAndPartition}
 import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
 import kafka.serializer.{StringDecoder, StringEncoder}
@@ -56,7 +56,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
   private val zkSessionTimeout = 6000
   private var zookeeper: EmbeddedZookeeper = _
   private var zkPort: Int = 0
-  private var brokerPort = 9092
+  protected var brokerPort = 9092
   private var brokerConf: KafkaConfig = _
   private var server: KafkaServer = _
   private var producer: Producer[String, String] = _
@@ -130,7 +130,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
   }
 
   def createTopic(topic: String) {
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
+    AdminUtils.createTopic(zkClient, topic, 1, 1)
     logInfo("==================== 5 ====================")
     // wait until metadata is propagated
     waitUntilMetadataIsPropagated(topic, 0)
@@ -166,7 +166,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
   private def waitUntilMetadataIsPropagated(topic: String, partition: Int) {
     eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
       assert(
-        server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, partition)),
+        server.apis.metadataCache.containsTopicAndPartition(topic, partition),
         s"Partition [$topic, $partition] metadata not propagated after timeout"
       )
     }
-- 
GitLab