Skip to content
Snippets Groups Projects
Commit 47af7c1e authored by cody koeninger's avatar cody koeninger Committed by Sean Owen
Browse files

[SPARK-8389] [STREAMING] [KAFKA] Example of getting offset ranges out o…

…f the existing java direct stream api

Author: cody koeninger <cody@koeninger.org>

Closes #6846 from koeninger/SPARK-8389 and squashes the following commits:

3f3c57a [cody koeninger] [Streaming][Kafka][SPARK-8389] Example of getting offset ranges out of the existing java direct stream api
parent ebd363ae
No related branches found
No related tags found
No related merge requests found
......@@ -32,6 +32,7 @@ import org.junit.Test;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
......@@ -65,8 +66,8 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
@Test
public void testKafkaStream() throws InterruptedException {
String topic1 = "topic1";
String topic2 = "topic2";
final String topic1 = "topic1";
final String topic2 = "topic2";
String[] topic1data = createTopicAndSendData(topic1);
String[] topic2data = createTopicAndSendData(topic2);
......@@ -87,6 +88,16 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
StringDecoder.class,
kafkaParams,
topicToSet(topic1)
).transformToPair(
// Make sure you can get offset ranges from the rdd
new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
@Override
public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
OffsetRange[] offsets = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
Assert.assertEquals(offsets[0].topic(), topic1);
return rdd;
}
}
).map(
new Function<Tuple2<String, String>, String>() {
@Override
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment