Skip to content
Snippets Groups Projects
Commit b0a5cd89 authored by Tyson Condie's avatar Tyson Condie Committed by Tathagata Das
Browse files

[SPARK-19719][SS] Kafka writer for both structured streaming and batch queires

## What changes were proposed in this pull request?

Add a new Kafka Sink and Kafka Relation for writing streaming and batch queries, respectively, to Apache Kafka.
### Streaming Kafka Sink
- When addBatch is called
-- If batchId is great than the last written batch
--- Write batch to Kafka
---- Topic will be taken from the record, if present, or from a topic option, which overrides topic in record.
-- Else ignore

### Batch Kafka Sink
- KafkaSourceProvider will implement CreatableRelationProvider
- CreatableRelationProvider#createRelation will write the passed in Dataframe to a Kafka
- Topic will be taken from the record, if present, or from topic option, which overrides topic in record.
- Save modes Append and ErrorIfExist supported under identical semantics. Other save modes result in an AnalysisException

tdas zsxwing

## How was this patch tested?

### The following unit tests will be included
- write to stream with topic field: valid stream write with data that includes an existing topic in the schema
- write structured streaming aggregation w/o topic field, with default topic: valid stream write with data that does not include a topic field, but the configuration includes a default topic
- write data with bad schema: various cases of writing data that does not conform to a proper schema e.g., 1. no topic field or default topic, and 2. no value field
- write data with valid schema but wrong types: data with a complete schema but wrong types e.g., key and value types are integers.
- write to non-existing topic: write a stream to a topic that does not exist in Kafka, which has been configured to not auto-create topics.
- write batch to kafka: simple write batch to Kafka, which goes through the same code path as streaming scenario, so validity checks will not be redone here.

### Examples
```scala
// Structured Streaming
val writer = inputStringStream.map(s => s.get(0).toString.getBytes()).toDF("value")
 .selectExpr("value as key", "value as value")
 .writeStream
 .format("kafka")
 .option("checkpointLocation", checkpointDir)
 .outputMode(OutputMode.Append)
 .option("kafka.bootstrap.servers", brokerAddress)
 .option("topic", topic)
 .queryName("kafkaStream")
 .start()

// Batch
val df = spark
 .sparkContext
 .parallelize(Seq("1", "2", "3", "4", "5"))
 .map(v => (topic, v))
 .toDF("topic", "value")

df.write
 .format("kafka")
 .option("kafka.bootstrap.servers",brokerAddress)
 .option("topic", topic)
 .save()
```
Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Tyson Condie <tcondie@gmail.com>

Closes #17043 from tcondie/kafka-writer.
parent f6471dc0
No related branches found
No related tags found
No related merge requests found
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.kafka010
import java.{util => ju}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.execution.streaming.Sink
private[kafka010] class KafkaSink(
sqlContext: SQLContext,
executorKafkaParams: ju.Map[String, Object],
topic: Option[String]) extends Sink with Logging {
@volatile private var latestBatchId = -1L
override def toString(): String = "KafkaSink"
override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= latestBatchId) {
logInfo(s"Skipping already committed batch $batchId")
} else {
KafkaWriter.write(sqlContext.sparkSession,
data.queryExecution, executorKafkaParams, topic)
latestBatchId = batchId
}
}
}
......@@ -23,12 +23,14 @@ import java.util.UUID
import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.streaming.Source
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
/**
......@@ -36,8 +38,12 @@ import org.apache.spark.sql.types.StructType
* IllegalArgumentException when the Kafka Dataset is created, so that it can catch
* missing options even before the query is started.
*/
private[kafka010] class KafkaSourceProvider extends DataSourceRegister with StreamSourceProvider
with RelationProvider with Logging {
private[kafka010] class KafkaSourceProvider extends DataSourceRegister
with StreamSourceProvider
with StreamSinkProvider
with RelationProvider
with CreatableRelationProvider
with Logging {
import KafkaSourceProvider._
override def shortName(): String = "kafka"
......@@ -152,6 +158,72 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister with Stre
endingRelationOffsets)
}
override def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
val defaultTopic = parameters.get(TOPIC_OPTION_KEY).map(_.trim)
val specifiedKafkaParams = kafkaParamsForProducer(parameters)
new KafkaSink(sqlContext,
new ju.HashMap[String, Object](specifiedKafkaParams.asJava), defaultTopic)
}
override def createRelation(
outerSQLContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
mode match {
case SaveMode.Overwrite | SaveMode.Ignore =>
throw new AnalysisException(s"Save mode $mode not allowed for Kafka. " +
s"Allowed save modes are ${SaveMode.Append} and " +
s"${SaveMode.ErrorIfExists} (default).")
case _ => // good
}
val topic = parameters.get(TOPIC_OPTION_KEY).map(_.trim)
val specifiedKafkaParams = kafkaParamsForProducer(parameters)
KafkaWriter.write(outerSQLContext.sparkSession, data.queryExecution,
new ju.HashMap[String, Object](specifiedKafkaParams.asJava), topic)
/* This method is suppose to return a relation that reads the data that was written.
* We cannot support this for Kafka. Therefore, in order to make things consistent,
* we return an empty base relation.
*/
new BaseRelation {
override def sqlContext: SQLContext = unsupportedException
override def schema: StructType = unsupportedException
override def needConversion: Boolean = unsupportedException
override def sizeInBytes: Long = unsupportedException
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = unsupportedException
private def unsupportedException =
throw new UnsupportedOperationException("BaseRelation from Kafka write " +
"operation is not usable.")
}
}
private def kafkaParamsForProducer(parameters: Map[String, String]): Map[String, String] = {
val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase, v) }
if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}")) {
throw new IllegalArgumentException(
s"Kafka option '${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}' is not supported as keys "
+ "are serialized with ByteArraySerializer.")
}
if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}"))
{
throw new IllegalArgumentException(
s"Kafka option '${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}' is not supported as "
+ "value are serialized with ByteArraySerializer.")
}
parameters
.keySet
.filter(_.toLowerCase.startsWith("kafka."))
.map { k => k.drop(6).toString -> parameters(k) }
.toMap + (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName)
}
private def kafkaParamsForDriver(specifiedKafkaParams: Map[String, String]) =
ConfigUpdater("source", specifiedKafkaParams)
.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
......@@ -381,6 +453,7 @@ private[kafka010] object KafkaSourceProvider {
private val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
private val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
val TOPIC_OPTION_KEY = "topic"
private val deserClassName = classOf[ByteArrayDeserializer].getName
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.kafka010
import java.{util => ju}
import org.apache.kafka.clients.producer.{KafkaProducer, _}
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection}
import org.apache.spark.sql.types.{BinaryType, StringType}
/**
* A simple trait for writing out data in a single Spark task, without any concerns about how
* to commit or abort tasks. Exceptions thrown by the implementation of this class will
* automatically trigger task aborts.
*/
private[kafka010] class KafkaWriteTask(
producerConfiguration: ju.Map[String, Object],
inputSchema: Seq[Attribute],
topic: Option[String]) {
// used to synchronize with Kafka callbacks
@volatile private var failedWrite: Exception = null
private val projection = createProjection
private var producer: KafkaProducer[Array[Byte], Array[Byte]] = _
/**
* Writes key value data out to topics.
*/
def execute(iterator: Iterator[InternalRow]): Unit = {
producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfiguration)
while (iterator.hasNext && failedWrite == null) {
val currentRow = iterator.next()
val projectedRow = projection(currentRow)
val topic = projectedRow.getUTF8String(0)
val key = projectedRow.getBinary(1)
val value = projectedRow.getBinary(2)
if (topic == null) {
throw new NullPointerException(s"null topic present in the data. Use the " +
s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.")
}
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value)
val callback = new Callback() {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
if (failedWrite == null && e != null) {
failedWrite = e
}
}
}
producer.send(record, callback)
}
}
def close(): Unit = {
if (producer != null) {
checkForErrors
producer.close()
checkForErrors
producer = null
}
}
private def createProjection: UnsafeProjection = {
val topicExpression = topic.map(Literal(_)).orElse {
inputSchema.find(_.name == KafkaWriter.TOPIC_ATTRIBUTE_NAME)
}.getOrElse {
throw new IllegalStateException(s"topic option required when no " +
s"'${KafkaWriter.TOPIC_ATTRIBUTE_NAME}' attribute is present")
}
topicExpression.dataType match {
case StringType => // good
case t =>
throw new IllegalStateException(s"${KafkaWriter.TOPIC_ATTRIBUTE_NAME} " +
s"attribute unsupported type $t. ${KafkaWriter.TOPIC_ATTRIBUTE_NAME} " +
s"must be a ${StringType}")
}
val keyExpression = inputSchema.find(_.name == KafkaWriter.KEY_ATTRIBUTE_NAME)
.getOrElse(Literal(null, BinaryType))
keyExpression.dataType match {
case StringType | BinaryType => // good
case t =>
throw new IllegalStateException(s"${KafkaWriter.KEY_ATTRIBUTE_NAME} " +
s"attribute unsupported type $t")
}
val valueExpression = inputSchema
.find(_.name == KafkaWriter.VALUE_ATTRIBUTE_NAME).getOrElse(
throw new IllegalStateException(s"Required attribute " +
s"'${KafkaWriter.VALUE_ATTRIBUTE_NAME}' not found")
)
valueExpression.dataType match {
case StringType | BinaryType => // good
case t =>
throw new IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " +
s"attribute unsupported type $t")
}
UnsafeProjection.create(
Seq(topicExpression, Cast(keyExpression, BinaryType),
Cast(valueExpression, BinaryType)), inputSchema)
}
private def checkForErrors: Unit = {
if (failedWrite != null) {
throw failedWrite
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.kafka010
import java.{util => ju}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
import org.apache.spark.sql.types.{BinaryType, StringType}
import org.apache.spark.util.Utils
/**
* The [[KafkaWriter]] class is used to write data from a batch query
* or structured streaming query, given by a [[QueryExecution]], to Kafka.
* The data is assumed to have a value column, and an optional topic and key
* columns. If the topic column is missing, then the topic must come from
* the 'topic' configuration option. If the key column is missing, then a
* null valued key field will be added to the
* [[org.apache.kafka.clients.producer.ProducerRecord]].
*/
private[kafka010] object KafkaWriter extends Logging {
val TOPIC_ATTRIBUTE_NAME: String = "topic"
val KEY_ATTRIBUTE_NAME: String = "key"
val VALUE_ATTRIBUTE_NAME: String = "value"
override def toString: String = "KafkaWriter"
def validateQuery(
queryExecution: QueryExecution,
kafkaParameters: ju.Map[String, Object],
topic: Option[String] = None): Unit = {
val schema = queryExecution.logical.output
schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse(
if (topic == None) {
throw new AnalysisException(s"topic option required when no " +
s"'$TOPIC_ATTRIBUTE_NAME' attribute is present. Use the " +
s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a topic.")
} else {
Literal(topic.get, StringType)
}
).dataType match {
case StringType => // good
case _ =>
throw new AnalysisException(s"Topic type must be a String")
}
schema.find(_.name == KEY_ATTRIBUTE_NAME).getOrElse(
Literal(null, StringType)
).dataType match {
case StringType | BinaryType => // good
case _ =>
throw new AnalysisException(s"$KEY_ATTRIBUTE_NAME attribute type " +
s"must be a String or BinaryType")
}
schema.find(_.name == VALUE_ATTRIBUTE_NAME).getOrElse(
throw new AnalysisException(s"Required attribute '$VALUE_ATTRIBUTE_NAME' not found")
).dataType match {
case StringType | BinaryType => // good
case _ =>
throw new AnalysisException(s"$VALUE_ATTRIBUTE_NAME attribute type " +
s"must be a String or BinaryType")
}
}
def write(
sparkSession: SparkSession,
queryExecution: QueryExecution,
kafkaParameters: ju.Map[String, Object],
topic: Option[String] = None): Unit = {
val schema = queryExecution.logical.output
validateQuery(queryExecution, kafkaParameters, topic)
SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
queryExecution.toRdd.foreachPartition { iter =>
val writeTask = new KafkaWriteTask(kafkaParameters, schema, topic)
Utils.tryWithSafeFinally(block = writeTask.execute(iter))(
finallyBlock = writeTask.close())
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.kafka010
import java.util.concurrent.atomic.AtomicInteger
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{BinaryType, DataType}
class KafkaSinkSuite extends StreamTest with SharedSQLContext {
import testImplicits._
protected var testUtils: KafkaTestUtils = _
override val streamingTimeout = 30.seconds
override def beforeAll(): Unit = {
super.beforeAll()
testUtils = new KafkaTestUtils(
withBrokerProps = Map("auto.create.topics.enable" -> "false"))
testUtils.setup()
}
override def afterAll(): Unit = {
if (testUtils != null) {
testUtils.teardown()
testUtils = null
super.afterAll()
}
}
test("batch - write to kafka") {
val topic = newTopic()
testUtils.createTopic(topic)
val df = Seq("1", "2", "3", "4", "5").map(v => (topic, v)).toDF("topic", "value")
df.write
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", topic)
.save()
checkAnswer(
createKafkaReader(topic).selectExpr("CAST(value as STRING) value"),
Row("1") :: Row("2") :: Row("3") :: Row("4") :: Row("5") :: Nil)
}
test("batch - null topic field value, and no topic option") {
val df = Seq[(String, String)](null.asInstanceOf[String] -> "1").toDF("topic", "value")
val ex = intercept[SparkException] {
df.write
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.save()
}
assert(ex.getMessage.toLowerCase.contains(
"null topic present in the data"))
}
test("batch - unsupported save modes") {
val topic = newTopic()
testUtils.createTopic(topic)
val df = Seq[(String, String)](null.asInstanceOf[String] -> "1").toDF("topic", "value")
// Test bad save mode Ignore
var ex = intercept[AnalysisException] {
df.write
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.mode(SaveMode.Ignore)
.save()
}
assert(ex.getMessage.toLowerCase.contains(
s"save mode ignore not allowed for kafka"))
// Test bad save mode Overwrite
ex = intercept[AnalysisException] {
df.write
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.mode(SaveMode.Overwrite)
.save()
}
assert(ex.getMessage.toLowerCase.contains(
s"save mode overwrite not allowed for kafka"))
}
test("streaming - write to kafka with topic field") {
val input = MemoryStream[String]
val topic = newTopic()
testUtils.createTopic(topic)
val writer = createKafkaWriter(
input.toDF(),
withTopic = None,
withOutputMode = Some(OutputMode.Append))(
withSelectExpr = s"'$topic' as topic", "value")
val reader = createKafkaReader(topic)
.selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value")
.selectExpr("CAST(key as INT) key", "CAST(value as INT) value")
.as[(Int, Int)]
.map(_._2)
try {
input.addData("1", "2", "3", "4", "5")
failAfter(streamingTimeout) {
writer.processAllAvailable()
}
checkDatasetUnorderly(reader, 1, 2, 3, 4, 5)
input.addData("6", "7", "8", "9", "10")
failAfter(streamingTimeout) {
writer.processAllAvailable()
}
checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
} finally {
writer.stop()
}
}
test("streaming - write aggregation w/o topic field, with topic option") {
val input = MemoryStream[String]
val topic = newTopic()
testUtils.createTopic(topic)
val writer = createKafkaWriter(
input.toDF().groupBy("value").count(),
withTopic = Some(topic),
withOutputMode = Some(OutputMode.Update()))(
withSelectExpr = "CAST(value as STRING) key", "CAST(count as STRING) value")
val reader = createKafkaReader(topic)
.selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value")
.selectExpr("CAST(key as INT) key", "CAST(value as INT) value")
.as[(Int, Int)]
try {
input.addData("1", "2", "2", "3", "3", "3")
failAfter(streamingTimeout) {
writer.processAllAvailable()
}
checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3))
input.addData("1", "2", "3")
failAfter(streamingTimeout) {
writer.processAllAvailable()
}
checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3), (1, 2), (2, 3), (3, 4))
} finally {
writer.stop()
}
}
test("streaming - aggregation with topic field and topic option") {
/* The purpose of this test is to ensure that the topic option
* overrides the topic field. We begin by writing some data that
* includes a topic field and value (e.g., 'foo') along with a topic
* option. Then when we read from the topic specified in the option
* we should see the data i.e., the data was written to the topic
* option, and not to the topic in the data e.g., foo
*/
val input = MemoryStream[String]
val topic = newTopic()
testUtils.createTopic(topic)
val writer = createKafkaWriter(
input.toDF().groupBy("value").count(),
withTopic = Some(topic),
withOutputMode = Some(OutputMode.Update()))(
withSelectExpr = "'foo' as topic",
"CAST(value as STRING) key", "CAST(count as STRING) value")
val reader = createKafkaReader(topic)
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.selectExpr("CAST(key AS INT)", "CAST(value AS INT)")
.as[(Int, Int)]
try {
input.addData("1", "2", "2", "3", "3", "3")
failAfter(streamingTimeout) {
writer.processAllAvailable()
}
checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3))
input.addData("1", "2", "3")
failAfter(streamingTimeout) {
writer.processAllAvailable()
}
checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3), (1, 2), (2, 3), (3, 4))
} finally {
writer.stop()
}
}
test("streaming - write data with bad schema") {
val input = MemoryStream[String]
val topic = newTopic()
testUtils.createTopic(topic)
/* No topic field or topic option */
var writer: StreamingQuery = null
var ex: Exception = null
try {
ex = intercept[StreamingQueryException] {
writer = createKafkaWriter(input.toDF())(
withSelectExpr = "value as key", "value"
)
input.addData("1", "2", "3", "4", "5")
writer.processAllAvailable()
}
} finally {
writer.stop()
}
assert(ex.getMessage
.toLowerCase
.contains("topic option required when no 'topic' attribute is present"))
try {
/* No value field */
ex = intercept[StreamingQueryException] {
writer = createKafkaWriter(input.toDF())(
withSelectExpr = s"'$topic' as topic", "value as key"
)
input.addData("1", "2", "3", "4", "5")
writer.processAllAvailable()
}
} finally {
writer.stop()
}
assert(ex.getMessage.toLowerCase.contains("required attribute 'value' not found"))
}
test("streaming - write data with valid schema but wrong types") {
val input = MemoryStream[String]
val topic = newTopic()
testUtils.createTopic(topic)
var writer: StreamingQuery = null
var ex: Exception = null
try {
/* topic field wrong type */
ex = intercept[StreamingQueryException] {
writer = createKafkaWriter(input.toDF())(
withSelectExpr = s"CAST('1' as INT) as topic", "value"
)
input.addData("1", "2", "3", "4", "5")
writer.processAllAvailable()
}
} finally {
writer.stop()
}
assert(ex.getMessage.toLowerCase.contains("topic type must be a string"))
try {
/* value field wrong type */
ex = intercept[StreamingQueryException] {
writer = createKafkaWriter(input.toDF())(
withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as value"
)
input.addData("1", "2", "3", "4", "5")
writer.processAllAvailable()
}
} finally {
writer.stop()
}
assert(ex.getMessage.toLowerCase.contains(
"value attribute type must be a string or binarytype"))
try {
ex = intercept[StreamingQueryException] {
/* key field wrong type */
writer = createKafkaWriter(input.toDF())(
withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as key", "value"
)
input.addData("1", "2", "3", "4", "5")
writer.processAllAvailable()
}
} finally {
writer.stop()
}
assert(ex.getMessage.toLowerCase.contains(
"key attribute type must be a string or binarytype"))
}
test("streaming - write to non-existing topic") {
val input = MemoryStream[String]
val topic = newTopic()
var writer: StreamingQuery = null
var ex: Exception = null
try {
ex = intercept[StreamingQueryException] {
writer = createKafkaWriter(input.toDF(), withTopic = Some(topic))()
input.addData("1", "2", "3", "4", "5")
writer.processAllAvailable()
}
} finally {
writer.stop()
}
assert(ex.getMessage.toLowerCase.contains("job aborted"))
}
test("streaming - exception on config serializer") {
val input = MemoryStream[String]
var writer: StreamingQuery = null
var ex: Exception = null
ex = intercept[IllegalArgumentException] {
writer = createKafkaWriter(
input.toDF(),
withOptions = Map("kafka.key.serializer" -> "foo"))()
}
assert(ex.getMessage.toLowerCase.contains(
"kafka option 'key.serializer' is not supported"))
ex = intercept[IllegalArgumentException] {
writer = createKafkaWriter(
input.toDF(),
withOptions = Map("kafka.value.serializer" -> "foo"))()
}
assert(ex.getMessage.toLowerCase.contains(
"kafka option 'value.serializer' is not supported"))
}
test("generic - write big data with small producer buffer") {
/* This test ensures that we understand the semantics of Kafka when
* is comes to blocking on a call to send when the send buffer is full.
* This test will configure the smallest possible producer buffer and
* indicate that we should block when it is full. Thus, no exception should
* be thrown in the case of a full buffer.
*/
val topic = newTopic()
testUtils.createTopic(topic, 1)
val options = new java.util.HashMap[String, Object]
options.put("bootstrap.servers", testUtils.brokerAddress)
options.put("buffer.memory", "16384") // min buffer size
options.put("block.on.buffer.full", "true")
options.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
options.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
val inputSchema = Seq(AttributeReference("value", BinaryType)())
val data = new Array[Byte](15000) // large value
val writeTask = new KafkaWriteTask(options, inputSchema, Some(topic))
try {
val fieldTypes: Array[DataType] = Array(BinaryType)
val converter = UnsafeProjection.create(fieldTypes)
val row = new SpecificInternalRow(fieldTypes)
row.update(0, data)
val iter = Seq.fill(1000)(converter.apply(row)).iterator
writeTask.execute(iter)
} finally {
writeTask.close()
}
}
private val topicId = new AtomicInteger(0)
private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
private def createKafkaReader(topic: String): DataFrame = {
spark.read
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("subscribe", topic)
.load()
}
private def createKafkaWriter(
input: DataFrame,
withTopic: Option[String] = None,
withOutputMode: Option[OutputMode] = None,
withOptions: Map[String, String] = Map[String, String]())
(withSelectExpr: String*): StreamingQuery = {
var stream: DataStreamWriter[Row] = null
withTempDir { checkpointDir =>
var df = input.toDF()
if (withSelectExpr.length > 0) {
df = df.selectExpr(withSelectExpr: _*)
}
stream = df.writeStream
.format("kafka")
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.queryName("kafkaStream")
withTopic.foreach(stream.option("topic", _))
withOutputMode.foreach(stream.outputMode(_))
withOptions.foreach(opt => stream.option(opt._1, opt._2))
}
stream.start()
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment