Skip to content
Snippets Groups Projects
Commit 48a9804b authored by cody koeninger's avatar cody koeninger Committed by Sean Owen
Browse files

[SPARK-12103][STREAMING][KAFKA][DOC] document that K means Key and V …

…means Value

Author: cody koeninger <cody@koeninger.org>

Closes #10132 from koeninger/SPARK-12103.
parent 4a39b5a1
No related branches found
No related tags found
No related merge requests found
...@@ -51,6 +51,7 @@ object KafkaUtils { ...@@ -51,6 +51,7 @@ object KafkaUtils {
* in its own thread * in its own thread
* @param storageLevel Storage level to use for storing the received objects * @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2) * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
* @return DStream of (Kafka message key, Kafka message value)
*/ */
def createStream( def createStream(
ssc: StreamingContext, ssc: StreamingContext,
...@@ -74,6 +75,11 @@ object KafkaUtils { ...@@ -74,6 +75,11 @@ object KafkaUtils {
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread. * in its own thread.
* @param storageLevel Storage level to use for storing the received objects * @param storageLevel Storage level to use for storing the received objects
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam U type of Kafka message key decoder
* @tparam T type of Kafka message value decoder
* @return DStream of (Kafka message key, Kafka message value)
*/ */
def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag]( def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
ssc: StreamingContext, ssc: StreamingContext,
...@@ -93,6 +99,7 @@ object KafkaUtils { ...@@ -93,6 +99,7 @@ object KafkaUtils {
* @param groupId The group id for this consumer * @param groupId The group id for this consumer
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread * in its own thread
* @return DStream of (Kafka message key, Kafka message value)
*/ */
def createStream( def createStream(
jssc: JavaStreamingContext, jssc: JavaStreamingContext,
...@@ -111,6 +118,7 @@ object KafkaUtils { ...@@ -111,6 +118,7 @@ object KafkaUtils {
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread. * in its own thread.
* @param storageLevel RDD storage level. * @param storageLevel RDD storage level.
* @return DStream of (Kafka message key, Kafka message value)
*/ */
def createStream( def createStream(
jssc: JavaStreamingContext, jssc: JavaStreamingContext,
...@@ -135,6 +143,11 @@ object KafkaUtils { ...@@ -135,6 +143,11 @@ object KafkaUtils {
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread * in its own thread
* @param storageLevel RDD storage level. * @param storageLevel RDD storage level.
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam U type of Kafka message key decoder
* @tparam T type of Kafka message value decoder
* @return DStream of (Kafka message key, Kafka message value)
*/ */
def createStream[K, V, U <: Decoder[_], T <: Decoder[_]]( def createStream[K, V, U <: Decoder[_], T <: Decoder[_]](
jssc: JavaStreamingContext, jssc: JavaStreamingContext,
...@@ -219,6 +232,11 @@ object KafkaUtils { ...@@ -219,6 +232,11 @@ object KafkaUtils {
* host1:port1,host2:port2 form. * host1:port1,host2:port2 form.
* @param offsetRanges Each OffsetRange in the batch corresponds to a * @param offsetRanges Each OffsetRange in the batch corresponds to a
* range of offsets for a given Kafka topic/partition * range of offsets for a given Kafka topic/partition
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam KD type of Kafka message key decoder
* @tparam VD type of Kafka message value decoder
* @return RDD of (Kafka message key, Kafka message value)
*/ */
def createRDD[ def createRDD[
K: ClassTag, K: ClassTag,
...@@ -251,6 +269,12 @@ object KafkaUtils { ...@@ -251,6 +269,12 @@ object KafkaUtils {
* @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map, * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map,
* in which case leaders will be looked up on the driver. * in which case leaders will be looked up on the driver.
* @param messageHandler Function for translating each message and metadata into the desired type * @param messageHandler Function for translating each message and metadata into the desired type
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam KD type of Kafka message key decoder
* @tparam VD type of Kafka message value decoder
* @tparam R type returned by messageHandler
* @return RDD of R
*/ */
def createRDD[ def createRDD[
K: ClassTag, K: ClassTag,
...@@ -288,6 +312,15 @@ object KafkaUtils { ...@@ -288,6 +312,15 @@ object KafkaUtils {
* host1:port1,host2:port2 form. * host1:port1,host2:port2 form.
* @param offsetRanges Each OffsetRange in the batch corresponds to a * @param offsetRanges Each OffsetRange in the batch corresponds to a
* range of offsets for a given Kafka topic/partition * range of offsets for a given Kafka topic/partition
* @param keyClass type of Kafka message key
* @param valueClass type of Kafka message value
* @param keyDecoderClass type of Kafka message key decoder
* @param valueDecoderClass type of Kafka message value decoder
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam KD type of Kafka message key decoder
* @tparam VD type of Kafka message value decoder
* @return RDD of (Kafka message key, Kafka message value)
*/ */
def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]]( def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
jsc: JavaSparkContext, jsc: JavaSparkContext,
...@@ -321,6 +354,12 @@ object KafkaUtils { ...@@ -321,6 +354,12 @@ object KafkaUtils {
* @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map, * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map,
* in which case leaders will be looked up on the driver. * in which case leaders will be looked up on the driver.
* @param messageHandler Function for translating each message and metadata into the desired type * @param messageHandler Function for translating each message and metadata into the desired type
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam KD type of Kafka message key decoder
* @tparam VD type of Kafka message value decoder
* @tparam R type returned by messageHandler
* @return RDD of R
*/ */
def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
jsc: JavaSparkContext, jsc: JavaSparkContext,
...@@ -373,6 +412,12 @@ object KafkaUtils { ...@@ -373,6 +412,12 @@ object KafkaUtils {
* @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive) * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive)
* starting point of the stream * starting point of the stream
* @param messageHandler Function for translating each message and metadata into the desired type * @param messageHandler Function for translating each message and metadata into the desired type
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam KD type of Kafka message key decoder
* @tparam VD type of Kafka message value decoder
* @tparam R type returned by messageHandler
* @return DStream of R
*/ */
def createDirectStream[ def createDirectStream[
K: ClassTag, K: ClassTag,
...@@ -419,6 +464,11 @@ object KafkaUtils { ...@@ -419,6 +464,11 @@ object KafkaUtils {
* If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest" * If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
* to determine where the stream starts (defaults to "largest") * to determine where the stream starts (defaults to "largest")
* @param topics Names of the topics to consume * @param topics Names of the topics to consume
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam KD type of Kafka message key decoder
* @tparam VD type of Kafka message value decoder
* @return DStream of (Kafka message key, Kafka message value)
*/ */
def createDirectStream[ def createDirectStream[
K: ClassTag, K: ClassTag,
...@@ -470,6 +520,12 @@ object KafkaUtils { ...@@ -470,6 +520,12 @@ object KafkaUtils {
* @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive) * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive)
* starting point of the stream * starting point of the stream
* @param messageHandler Function for translating each message and metadata into the desired type * @param messageHandler Function for translating each message and metadata into the desired type
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam KD type of Kafka message key decoder
* @tparam VD type of Kafka message value decoder
* @tparam R type returned by messageHandler
* @return DStream of R
*/ */
def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
jssc: JavaStreamingContext, jssc: JavaStreamingContext,
...@@ -529,6 +585,11 @@ object KafkaUtils { ...@@ -529,6 +585,11 @@ object KafkaUtils {
* If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest" * If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
* to determine where the stream starts (defaults to "largest") * to determine where the stream starts (defaults to "largest")
* @param topics Names of the topics to consume * @param topics Names of the topics to consume
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam KD type of Kafka message key decoder
* @tparam VD type of Kafka message value decoder
* @return DStream of (Kafka message key, Kafka message value)
*/ */
def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]]( def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]](
jssc: JavaStreamingContext, jssc: JavaStreamingContext,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment