Skip to content
Snippets Groups Projects
Unverified Commit ab8da141 authored by Liwei Lin's avatar Liwei Lin Committed by Sean Owen
Browse files

[SPARK-18198][DOC][STREAMING] Highlight code snippets

## What changes were proposed in this pull request?

This patch uses `{% highlight lang %}...{% endhighlight %}` to highlight code snippets in the `Structured Streaming Kafka010 integration doc` and the `Spark Streaming Kafka010 integration doc`.

This patch consists of two commits:
- the first commit fixes only the leading spaces -- this is large
- the second commit adds the highlight instructions -- this is much simpler and easier to review

## How was this patch tested?

SKIP_API=1 jekyll build

## Screenshots

**Before**

![snip20161101_3](https://cloud.githubusercontent.com/assets/15843379/19894258/47746524-a087-11e6-9a2a-7bff2d428d44.png)

**After**

![snip20161101_1](https://cloud.githubusercontent.com/assets/15843379/19894324/8bebcd1e-a087-11e6-835b-88c4d2979cfa.png

)

Author: Liwei Lin <lwlin7@gmail.com>

Closes #15715 from lw-lin/doc-highlight-code-snippet.

(cherry picked from commit 98ede494)
Signed-off-by: default avatarSean Owen <sowen@cloudera.com>
parent 3b624bed
No related branches found
No related tags found
No related merge requests found
...@@ -17,69 +17,72 @@ For Scala/Java applications using SBT/Maven project definitions, link your strea ...@@ -17,69 +17,72 @@ For Scala/Java applications using SBT/Maven project definitions, link your strea
<div class="codetabs"> <div class="codetabs">
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
import org.apache.kafka.clients.consumer.ConsumerRecord {% highlight scala %}
import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.kafka010._ import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092", val kafkaParams = Map[String, Object](
"key.deserializer" -> classOf[StringDeserializer], "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"value.deserializer" -> classOf[StringDeserializer], "key.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream", "value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest", "group.id" -> "use_a_separate_group_id_for_each_stream",
"enable.auto.commit" -> (false: java.lang.Boolean) "auto.offset.reset" -> "latest",
) "enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String]( val topics = Array("topicA", "topicB")
streamingContext, val stream = KafkaUtils.createDirectStream[String, String](
PreferConsistent, streamingContext,
Subscribe[String, String](topics, kafkaParams) PreferConsistent,
) Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
stream.map(record => (record.key, record.value))
{% endhighlight %}
Each item in the stream is a [ConsumerRecord](http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html) Each item in the stream is a [ConsumerRecord](http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html)
</div> </div>
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
import java.util.*; {% highlight java %}
import org.apache.spark.SparkConf; import java.util.*;
import org.apache.spark.TaskContext; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.*; import org.apache.spark.TaskContext;
import org.apache.spark.api.java.function.*; import org.apache.spark.api.java.*;
import org.apache.spark.streaming.api.java.*; import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.kafka010.*; import org.apache.spark.streaming.api.java.*;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.TopicPartition;
import scala.Tuple2; import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092"); Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("enable.auto.commit", false); kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("topicA", "topicB");
Collection<String> topics = Arrays.asList("topicA", "topicB");
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream( final JavaInputDStream<ConsumerRecord<String, String>> stream =
streamingContext, KafkaUtils.createDirectStream(
LocationStrategies.PreferConsistent(), streamingContext,
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams) LocationStrategies.PreferConsistent(),
); ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
stream.mapToPair(
new PairFunction<ConsumerRecord<String, String>, String, String>() { stream.mapToPair(
@Override new PairFunction<ConsumerRecord<String, String>, String, String>() {
public Tuple2<String, String> call(ConsumerRecord<String, String> record) { @Override
return new Tuple2<>(record.key(), record.value()); public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
} return new Tuple2<>(record.key(), record.value());
}) }
})
{% endhighlight %}
</div> </div>
</div> </div>
...@@ -109,32 +112,35 @@ If you have a use case that is better suited to batch processing, you can create ...@@ -109,32 +112,35 @@ If you have a use case that is better suited to batch processing, you can create
<div class="codetabs"> <div class="codetabs">
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
// Import dependencies and create kafka params as in Create Direct Stream above {% highlight scala %}
// Import dependencies and create kafka params as in Create Direct Stream above
val offsetRanges = Array(
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange("test", 0, 0, 100),
OffsetRange("test", 1, 0, 100)
)
val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent) val offsetRanges = Array(
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange("test", 0, 0, 100),
OffsetRange("test", 1, 0, 100)
)
val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
{% endhighlight %}
</div> </div>
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
// Import dependencies and create kafka params as in Create Direct Stream above {% highlight java %}
// Import dependencies and create kafka params as in Create Direct Stream above
OffsetRange[] offsetRanges = {
// topic, partition, inclusive starting offset, exclusive ending offset OffsetRange[] offsetRanges = {
OffsetRange.create("test", 0, 0, 100), // topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange.create("test", 1, 0, 100) OffsetRange.create("test", 0, 0, 100),
}; OffsetRange.create("test", 1, 0, 100)
};
JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
sparkContext, JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
kafkaParams, sparkContext,
offsetRanges, kafkaParams,
LocationStrategies.PreferConsistent() offsetRanges,
); LocationStrategies.PreferConsistent()
);
{% endhighlight %}
</div> </div>
</div> </div>
...@@ -144,29 +150,33 @@ Note that you cannot use `PreferBrokers`, because without the stream there is no ...@@ -144,29 +150,33 @@ Note that you cannot use `PreferBrokers`, because without the stream there is no
<div class="codetabs"> <div class="codetabs">
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
stream.foreachRDD { rdd => {% highlight scala %}
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges stream.foreachRDD { rdd =>
rdd.foreachPartition { iter => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) rdd.foreachPartition { iter =>
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
} println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
} }
}
{% endhighlight %}
</div> </div>
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() { {% highlight java %}
@Override stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
public void call(JavaRDD<ConsumerRecord<String, String>> rdd) { @Override
final OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() { final OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
@Override rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() {
public void call(Iterator<ConsumerRecord<String, String>> consumerRecords) { @Override
OffsetRange o = offsetRanges[TaskContext.get().partitionId()]; public void call(Iterator<ConsumerRecord<String, String>> consumerRecords) {
System.out.println( OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()); System.out.println(
} o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
}); }
} });
}); }
});
{% endhighlight %}
</div> </div>
</div> </div>
...@@ -183,25 +193,28 @@ Kafka has an offset commit API that stores offsets in a special Kafka topic. By ...@@ -183,25 +193,28 @@ Kafka has an offset commit API that stores offsets in a special Kafka topic. By
<div class="codetabs"> <div class="codetabs">
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
stream.foreachRDD { rdd => {% highlight scala %}
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) // some time later, after outputs have completed
} stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
{% endhighlight %}
As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if called on the result of createDirectStream, not after transformations. The commitAsync call is threadsafe, but must occur after outputs if you want meaningful semantics. As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if called on the result of createDirectStream, not after transformations. The commitAsync call is threadsafe, but must occur after outputs if you want meaningful semantics.
</div> </div>
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() { {% highlight java %}
@Override stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
public void call(JavaRDD<ConsumerRecord<String, String>> rdd) { @Override
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// some time later, after outputs have completed
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges); // some time later, after outputs have completed
} ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
}); }
});
{% endhighlight %}
</div> </div>
</div> </div>
...@@ -210,64 +223,68 @@ For data stores that support transactions, saving offsets in the same transactio ...@@ -210,64 +223,68 @@ For data stores that support transactions, saving offsets in the same transactio
<div class="codetabs"> <div class="codetabs">
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
// The details depend on your data store, but the general idea looks like this {% highlight scala %}
// The details depend on your data store, but the general idea looks like this
// begin from the the offsets committed to the database // begin from the the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet => val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset") new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap }.toMap
val stream = KafkaUtils.createDirectStream[String, String]( val stream = KafkaUtils.createDirectStream[String, String](
streamingContext, streamingContext,
PreferConsistent, PreferConsistent,
Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets) Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
) )
stream.foreachRDD { rdd => stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val results = yourCalculation(rdd) val results = yourCalculation(rdd)
// begin your transaction // begin your transaction
// update results // update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets // update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly // assert that offsets were updated correctly
// end your transaction // end your transaction
} }
{% endhighlight %}
</div> </div>
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
// The details depend on your data store, but the general idea looks like this {% highlight java %}
// The details depend on your data store, but the general idea looks like this
// begin from the the offsets committed to the database
Map<TopicPartition, Long> fromOffsets = new HashMap<>(); // begin from the the offsets committed to the database
for (resultSet : selectOffsetsFromYourDatabase) Map<TopicPartition, Long> fromOffsets = new HashMap<>();
fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset")); for (resultSet : selectOffsetsFromYourDatabase)
} fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
}
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
streamingContext, JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
LocationStrategies.PreferConsistent(), streamingContext,
ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets) LocationStrategies.PreferConsistent(),
); ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
);
stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
@Override stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
public void call(JavaRDD<ConsumerRecord<String, String>> rdd) { @Override
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
Object results = yourCalculation(rdd);
Object results = yourCalculation(rdd);
// begin your transaction
// begin your transaction
// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets // update results
// assert that offsets were updated correctly // update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly
// end your transaction
} // end your transaction
}); }
});
{% endhighlight %}
</div> </div>
</div> </div>
...@@ -277,25 +294,29 @@ The new Kafka consumer [supports SSL](http://kafka.apache.org/documentation.html ...@@ -277,25 +294,29 @@ The new Kafka consumer [supports SSL](http://kafka.apache.org/documentation.html
<div class="codetabs"> <div class="codetabs">
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
val kafkaParams = Map[String, Object]( {% highlight scala %}
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS val kafkaParams = Map[String, Object](
"security.protocol" -> "SSL", // the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
"ssl.truststore.location" -> "/some-directory/kafka.client.truststore.jks", "security.protocol" -> "SSL",
"ssl.truststore.password" -> "test1234", "ssl.truststore.location" -> "/some-directory/kafka.client.truststore.jks",
"ssl.keystore.location" -> "/some-directory/kafka.client.keystore.jks", "ssl.truststore.password" -> "test1234",
"ssl.keystore.password" -> "test1234", "ssl.keystore.location" -> "/some-directory/kafka.client.keystore.jks",
"ssl.key.password" -> "test1234" "ssl.keystore.password" -> "test1234",
) "ssl.key.password" -> "test1234"
)
{% endhighlight %}
</div> </div>
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
Map<String, Object> kafkaParams = new HashMap<String, Object>(); {% highlight java %}
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("security.protocol", "SSL"); // the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
kafkaParams.put("ssl.truststore.location", "/some-directory/kafka.client.truststore.jks"); kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.truststore.password", "test1234"); kafkaParams.put("ssl.truststore.location", "/some-directory/kafka.client.truststore.jks");
kafkaParams.put("ssl.keystore.location", "/some-directory/kafka.client.keystore.jks"); kafkaParams.put("ssl.truststore.password", "test1234");
kafkaParams.put("ssl.keystore.password", "test1234"); kafkaParams.put("ssl.keystore.location", "/some-directory/kafka.client.keystore.jks");
kafkaParams.put("ssl.key.password", "test1234"); kafkaParams.put("ssl.keystore.password", "test1234");
kafkaParams.put("ssl.key.password", "test1234");
{% endhighlight %}
</div> </div>
</div> </div>
......
...@@ -19,97 +19,103 @@ application. See the [Deploying](#deploying) subsection below. ...@@ -19,97 +19,103 @@ application. See the [Deploying](#deploying) subsection below.
<div class="codetabs"> <div class="codetabs">
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
{% highlight scala %}
// Subscribe to 1 topic // Subscribe to 1 topic
val ds1 = spark val ds1 = spark
.readStream .readStream
.format("kafka") .format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1") .option("subscribe", "topic1")
.load() .load()
ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)] .as[(String, String)]
// Subscribe to multiple topics // Subscribe to multiple topics
val ds2 = spark val ds2 = spark
.readStream .readStream
.format("kafka") .format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2") .option("subscribe", "topic1,topic2")
.load() .load()
ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)] .as[(String, String)]
// Subscribe to a pattern // Subscribe to a pattern
val ds3 = spark val ds3 = spark
.readStream .readStream
.format("kafka") .format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*") .option("subscribePattern", "topic.*")
.load() .load()
ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)] .as[(String, String)]
{% endhighlight %}
</div> </div>
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
{% highlight java %}
// Subscribe to 1 topic // Subscribe to 1 topic
Dataset<Row> ds1 = spark Dataset<Row> ds1 = spark
.readStream() .readStream()
.format("kafka") .format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1") .option("subscribe", "topic1")
.load() .load()
ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// Subscribe to multiple topics // Subscribe to multiple topics
Dataset<Row> ds2 = spark Dataset<Row> ds2 = spark
.readStream() .readStream()
.format("kafka") .format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2") .option("subscribe", "topic1,topic2")
.load() .load()
ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// Subscribe to a pattern // Subscribe to a pattern
Dataset<Row> ds3 = spark Dataset<Row> ds3 = spark
.readStream() .readStream()
.format("kafka") .format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*") .option("subscribePattern", "topic.*")
.load() .load()
ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
{% endhighlight %}
</div> </div>
<div data-lang="python" markdown="1"> <div data-lang="python" markdown="1">
{% highlight python %}
# Subscribe to 1 topic # Subscribe to 1 topic
ds1 = spark ds1 = spark
.readStream() .readStream()
.format("kafka") .format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1") .option("subscribe", "topic1")
.load() .load()
ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to multiple topics # Subscribe to multiple topics
ds2 = spark ds2 = spark
.readStream .readStream
.format("kafka") .format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2") .option("subscribe", "topic1,topic2")
.load() .load()
ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to a pattern # Subscribe to a pattern
ds3 = spark ds3 = spark
.readStream() .readStream()
.format("kafka") .format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*") .option("subscribePattern", "topic.*")
.load() .load()
ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
{% endhighlight %}
</div> </div>
</div> </div>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment