Skip to content
Snippets Groups Projects
Unverified Commit 505b927c authored by Liwei Lin's avatar Liwei Lin Committed by Sean Owen
Browse files

[SPARK-16312][FOLLOW-UP][STREAMING][KAFKA][DOC] Add java code snippet for...

[SPARK-16312][FOLLOW-UP][STREAMING][KAFKA][DOC] Add java code snippet for Kafka 0.10 integration doc

## What changes were proposed in this pull request?

added java code snippet for Kafka 0.10 integration doc

## How was this patch tested?

SKIP_API=1 jekyll build

## Screenshot

![kafka-doc](https://cloud.githubusercontent.com/assets/15843379/19826272/bf0d8a4c-9db8-11e6-9e40-1396723df4bc.png)

Author: Liwei Lin <lwlin7@gmail.com>

Closes #15679 from lw-lin/kafka-010-examples.
parent d2d438d1
No related branches found
No related tags found
No related merge requests found
...@@ -8,9 +8,9 @@ The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 [ ...@@ -8,9 +8,9 @@ The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 [
### Linking ### Linking
For Scala/Java applications using SBT/Maven project definitions, link your streaming application with the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information). For Scala/Java applications using SBT/Maven project definitions, link your streaming application with the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information).
groupId = org.apache.spark groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}} artifactId = spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}}
version = {{site.SPARK_VERSION_SHORT}} version = {{site.SPARK_VERSION_SHORT}}
### Creating a Direct Stream ### Creating a Direct Stream
Note that the namespace for the import includes the version, org.apache.spark.streaming.kafka010 Note that the namespace for the import includes the version, org.apache.spark.streaming.kafka010
...@@ -44,6 +44,42 @@ For Scala/Java applications using SBT/Maven project definitions, link your strea ...@@ -44,6 +44,42 @@ For Scala/Java applications using SBT/Maven project definitions, link your strea
Each item in the stream is a [ConsumerRecord](http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html) Each item in the stream is a [ConsumerRecord](http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html)
</div> </div>
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("topicA", "topicB");
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
stream.mapToPair(
new PairFunction<ConsumerRecord<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
return new Tuple2<>(record.key(), record.value());
}
})
</div> </div>
</div> </div>
...@@ -85,6 +121,20 @@ If you have a use case that is better suited to batch processing, you can create ...@@ -85,6 +121,20 @@ If you have a use case that is better suited to batch processing, you can create
</div> </div>
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
// Import dependencies and create kafka params as in Create Direct Stream above
OffsetRange[] offsetRanges = {
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange.create("test", 0, 0, 100),
OffsetRange.create("test", 1, 0, 100)
};
JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
sparkContext,
kafkaParams,
offsetRanges,
LocationStrategies.PreferConsistent()
);
</div> </div>
</div> </div>
...@@ -103,6 +153,20 @@ Note that you cannot use `PreferBrokers`, because without the stream there is no ...@@ -103,6 +153,20 @@ Note that you cannot use `PreferBrokers`, because without the stream there is no
} }
</div> </div>
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
@Override
public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
final OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() {
@Override
public void call(Iterator<ConsumerRecord<String, String>> consumerRecords) {
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
System.out.println(
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
}
});
}
});
</div> </div>
</div> </div>
...@@ -120,15 +184,24 @@ Kafka has an offset commit API that stores offsets in a special Kafka topic. By ...@@ -120,15 +184,24 @@ Kafka has an offset commit API that stores offsets in a special Kafka topic. By
<div class="codetabs"> <div class="codetabs">
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
stream.foreachRDD { rdd => stream.foreachRDD { rdd =>
val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed // some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets) stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
} }
As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if called on the result of createDirectStream, not after transformations. The commitAsync call is threadsafe, but must occur after outputs if you want meaningful semantics. As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if called on the result of createDirectStream, not after transformations. The commitAsync call is threadsafe, but must occur after outputs if you want meaningful semantics.
</div> </div>
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
@Override
public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// some time later, after outputs have completed
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
}
});
</div> </div>
</div> </div>
...@@ -141,7 +214,7 @@ For data stores that support transactions, saving offsets in the same transactio ...@@ -141,7 +214,7 @@ For data stores that support transactions, saving offsets in the same transactio
// begin from the the offsets committed to the database // begin from the the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet => val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
new TopicPartition(resultSet.string("topic")), resultSet.int("partition")) -> resultSet.long("offset") new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap }.toMap
val stream = KafkaUtils.createDirectStream[String, String]( val stream = KafkaUtils.createDirectStream[String, String](
...@@ -155,16 +228,46 @@ For data stores that support transactions, saving offsets in the same transactio ...@@ -155,16 +228,46 @@ For data stores that support transactions, saving offsets in the same transactio
val results = yourCalculation(rdd) val results = yourCalculation(rdd)
yourTransactionBlock { // begin your transaction
// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets // update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly
// assert that offsets were updated correctly // end your transaction
}
} }
</div> </div>
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
// The details depend on your data store, but the general idea looks like this
// begin from the the offsets committed to the database
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
for (resultSet : selectOffsetsFromYourDatabase)
fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
}
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
);
stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
@Override
public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
Object results = yourCalculation(rdd);
// begin your transaction
// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly
// end your transaction
}
});
</div> </div>
</div> </div>
...@@ -185,6 +288,14 @@ The new Kafka consumer [supports SSL](http://kafka.apache.org/documentation.html ...@@ -185,6 +288,14 @@ The new Kafka consumer [supports SSL](http://kafka.apache.org/documentation.html
) )
</div> </div>
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
Map<String, Object> kafkaParams = new HashMap<String, Object>();
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.truststore.location", "/some-directory/kafka.client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "test1234");
kafkaParams.put("ssl.keystore.location", "/some-directory/kafka.client.keystore.jks");
kafkaParams.put("ssl.keystore.password", "test1234");
kafkaParams.put("ssl.key.password", "test1234");
</div> </div>
</div> </div>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment