diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala new file mode 100644 index 0000000000000000000000000000000000000000..08914d82fffdd019495164a072b5f2e7151c7e66 --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSink.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.execution.streaming.Sink + +private[kafka010] class KafkaSink( + sqlContext: SQLContext, + executorKafkaParams: ju.Map[String, Object], + topic: Option[String]) extends Sink with Logging { + @volatile private var latestBatchId = -1L + + override def toString(): String = "KafkaSink" + + override def addBatch(batchId: Long, data: DataFrame): Unit = { + if (batchId <= latestBatchId) { + logInfo(s"Skipping already committed batch $batchId") + } else { + KafkaWriter.write(sqlContext.sparkSession, + data.queryExecution, executorKafkaParams, topic) + latestBatchId = batchId + } + } +} diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 6a7456719875fa8d992b339a11d8a91ecaff898f..febe3c217122a81f6256bd3e5343aea4d085fa28 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -23,12 +23,14 @@ import java.util.UUID import scala.collection.JavaConverters._ import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} import org.apache.spark.internal.Logging -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.execution.streaming.Source +import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.sources._ +import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType /** @@ -36,8 +38,12 @@ import org.apache.spark.sql.types.StructType * IllegalArgumentException when the Kafka Dataset is created, so that it can catch * missing options even before the query is started. */ -private[kafka010] class KafkaSourceProvider extends DataSourceRegister with StreamSourceProvider - with RelationProvider with Logging { +private[kafka010] class KafkaSourceProvider extends DataSourceRegister + with StreamSourceProvider + with StreamSinkProvider + with RelationProvider + with CreatableRelationProvider + with Logging { import KafkaSourceProvider._ override def shortName(): String = "kafka" @@ -152,6 +158,72 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister with Stre endingRelationOffsets) } + override def createSink( + sqlContext: SQLContext, + parameters: Map[String, String], + partitionColumns: Seq[String], + outputMode: OutputMode): Sink = { + val defaultTopic = parameters.get(TOPIC_OPTION_KEY).map(_.trim) + val specifiedKafkaParams = kafkaParamsForProducer(parameters) + new KafkaSink(sqlContext, + new ju.HashMap[String, Object](specifiedKafkaParams.asJava), defaultTopic) + } + + override def createRelation( + outerSQLContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { + mode match { + case SaveMode.Overwrite | SaveMode.Ignore => + throw new AnalysisException(s"Save mode $mode not allowed for Kafka. " + + s"Allowed save modes are ${SaveMode.Append} and " + + s"${SaveMode.ErrorIfExists} (default).") + case _ => // good + } + val topic = parameters.get(TOPIC_OPTION_KEY).map(_.trim) + val specifiedKafkaParams = kafkaParamsForProducer(parameters) + KafkaWriter.write(outerSQLContext.sparkSession, data.queryExecution, + new ju.HashMap[String, Object](specifiedKafkaParams.asJava), topic) + + /* This method is suppose to return a relation that reads the data that was written. + * We cannot support this for Kafka. Therefore, in order to make things consistent, + * we return an empty base relation. + */ + new BaseRelation { + override def sqlContext: SQLContext = unsupportedException + override def schema: StructType = unsupportedException + override def needConversion: Boolean = unsupportedException + override def sizeInBytes: Long = unsupportedException + override def unhandledFilters(filters: Array[Filter]): Array[Filter] = unsupportedException + private def unsupportedException = + throw new UnsupportedOperationException("BaseRelation from Kafka write " + + "operation is not usable.") + } + } + + private def kafkaParamsForProducer(parameters: Map[String, String]): Map[String, String] = { + val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase, v) } + if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}")) { + throw new IllegalArgumentException( + s"Kafka option '${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}' is not supported as keys " + + "are serialized with ByteArraySerializer.") + } + + if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}")) + { + throw new IllegalArgumentException( + s"Kafka option '${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}' is not supported as " + + "value are serialized with ByteArraySerializer.") + } + parameters + .keySet + .filter(_.toLowerCase.startsWith("kafka.")) + .map { k => k.drop(6).toString -> parameters(k) } + .toMap + (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName) + } + private def kafkaParamsForDriver(specifiedKafkaParams: Map[String, String]) = ConfigUpdater("source", specifiedKafkaParams) .set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName) @@ -381,6 +453,7 @@ private[kafka010] object KafkaSourceProvider { private val STARTING_OFFSETS_OPTION_KEY = "startingoffsets" private val ENDING_OFFSETS_OPTION_KEY = "endingoffsets" private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss" + val TOPIC_OPTION_KEY = "topic" private val deserClassName = classOf[ByteArrayDeserializer].getName } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala new file mode 100644 index 0000000000000000000000000000000000000000..6e160cbe2db5268aff2afdc181daf3df499d278b --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} + +import org.apache.kafka.clients.producer.{KafkaProducer, _} +import org.apache.kafka.common.serialization.ByteArraySerializer + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection} +import org.apache.spark.sql.types.{BinaryType, StringType} + +/** + * A simple trait for writing out data in a single Spark task, without any concerns about how + * to commit or abort tasks. Exceptions thrown by the implementation of this class will + * automatically trigger task aborts. + */ +private[kafka010] class KafkaWriteTask( + producerConfiguration: ju.Map[String, Object], + inputSchema: Seq[Attribute], + topic: Option[String]) { + // used to synchronize with Kafka callbacks + @volatile private var failedWrite: Exception = null + private val projection = createProjection + private var producer: KafkaProducer[Array[Byte], Array[Byte]] = _ + + /** + * Writes key value data out to topics. + */ + def execute(iterator: Iterator[InternalRow]): Unit = { + producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfiguration) + while (iterator.hasNext && failedWrite == null) { + val currentRow = iterator.next() + val projectedRow = projection(currentRow) + val topic = projectedRow.getUTF8String(0) + val key = projectedRow.getBinary(1) + val value = projectedRow.getBinary(2) + if (topic == null) { + throw new NullPointerException(s"null topic present in the data. Use the " + + s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.") + } + val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value) + val callback = new Callback() { + override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = { + if (failedWrite == null && e != null) { + failedWrite = e + } + } + } + producer.send(record, callback) + } + } + + def close(): Unit = { + if (producer != null) { + checkForErrors + producer.close() + checkForErrors + producer = null + } + } + + private def createProjection: UnsafeProjection = { + val topicExpression = topic.map(Literal(_)).orElse { + inputSchema.find(_.name == KafkaWriter.TOPIC_ATTRIBUTE_NAME) + }.getOrElse { + throw new IllegalStateException(s"topic option required when no " + + s"'${KafkaWriter.TOPIC_ATTRIBUTE_NAME}' attribute is present") + } + topicExpression.dataType match { + case StringType => // good + case t => + throw new IllegalStateException(s"${KafkaWriter.TOPIC_ATTRIBUTE_NAME} " + + s"attribute unsupported type $t. ${KafkaWriter.TOPIC_ATTRIBUTE_NAME} " + + s"must be a ${StringType}") + } + val keyExpression = inputSchema.find(_.name == KafkaWriter.KEY_ATTRIBUTE_NAME) + .getOrElse(Literal(null, BinaryType)) + keyExpression.dataType match { + case StringType | BinaryType => // good + case t => + throw new IllegalStateException(s"${KafkaWriter.KEY_ATTRIBUTE_NAME} " + + s"attribute unsupported type $t") + } + val valueExpression = inputSchema + .find(_.name == KafkaWriter.VALUE_ATTRIBUTE_NAME).getOrElse( + throw new IllegalStateException(s"Required attribute " + + s"'${KafkaWriter.VALUE_ATTRIBUTE_NAME}' not found") + ) + valueExpression.dataType match { + case StringType | BinaryType => // good + case t => + throw new IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " + + s"attribute unsupported type $t") + } + UnsafeProjection.create( + Seq(topicExpression, Cast(keyExpression, BinaryType), + Cast(valueExpression, BinaryType)), inputSchema) + } + + private def checkForErrors: Unit = { + if (failedWrite != null) { + throw failedWrite + } + } +} + diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala new file mode 100644 index 0000000000000000000000000000000000000000..a637d52c933a33d4f0b9d390122bfe01db9ff5ee --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.{QueryExecution, SQLExecution} +import org.apache.spark.sql.types.{BinaryType, StringType} +import org.apache.spark.util.Utils + +/** + * The [[KafkaWriter]] class is used to write data from a batch query + * or structured streaming query, given by a [[QueryExecution]], to Kafka. + * The data is assumed to have a value column, and an optional topic and key + * columns. If the topic column is missing, then the topic must come from + * the 'topic' configuration option. If the key column is missing, then a + * null valued key field will be added to the + * [[org.apache.kafka.clients.producer.ProducerRecord]]. + */ +private[kafka010] object KafkaWriter extends Logging { + val TOPIC_ATTRIBUTE_NAME: String = "topic" + val KEY_ATTRIBUTE_NAME: String = "key" + val VALUE_ATTRIBUTE_NAME: String = "value" + + override def toString: String = "KafkaWriter" + + def validateQuery( + queryExecution: QueryExecution, + kafkaParameters: ju.Map[String, Object], + topic: Option[String] = None): Unit = { + val schema = queryExecution.logical.output + schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse( + if (topic == None) { + throw new AnalysisException(s"topic option required when no " + + s"'$TOPIC_ATTRIBUTE_NAME' attribute is present. Use the " + + s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a topic.") + } else { + Literal(topic.get, StringType) + } + ).dataType match { + case StringType => // good + case _ => + throw new AnalysisException(s"Topic type must be a String") + } + schema.find(_.name == KEY_ATTRIBUTE_NAME).getOrElse( + Literal(null, StringType) + ).dataType match { + case StringType | BinaryType => // good + case _ => + throw new AnalysisException(s"$KEY_ATTRIBUTE_NAME attribute type " + + s"must be a String or BinaryType") + } + schema.find(_.name == VALUE_ATTRIBUTE_NAME).getOrElse( + throw new AnalysisException(s"Required attribute '$VALUE_ATTRIBUTE_NAME' not found") + ).dataType match { + case StringType | BinaryType => // good + case _ => + throw new AnalysisException(s"$VALUE_ATTRIBUTE_NAME attribute type " + + s"must be a String or BinaryType") + } + } + + def write( + sparkSession: SparkSession, + queryExecution: QueryExecution, + kafkaParameters: ju.Map[String, Object], + topic: Option[String] = None): Unit = { + val schema = queryExecution.logical.output + validateQuery(queryExecution, kafkaParameters, topic) + SQLExecution.withNewExecutionId(sparkSession, queryExecution) { + queryExecution.toRdd.foreachPartition { iter => + val writeTask = new KafkaWriteTask(kafkaParameters, schema, topic) + Utils.tryWithSafeFinally(block = writeTask.execute(iter))( + finallyBlock = writeTask.close()) + } + } + } +} diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..490535623cb3670593c0d34486a3fafa5da8a5c5 --- /dev/null +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.SparkException +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection} +import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.streaming._ +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{BinaryType, DataType} + +class KafkaSinkSuite extends StreamTest with SharedSQLContext { + import testImplicits._ + + protected var testUtils: KafkaTestUtils = _ + + override val streamingTimeout = 30.seconds + + override def beforeAll(): Unit = { + super.beforeAll() + testUtils = new KafkaTestUtils( + withBrokerProps = Map("auto.create.topics.enable" -> "false")) + testUtils.setup() + } + + override def afterAll(): Unit = { + if (testUtils != null) { + testUtils.teardown() + testUtils = null + super.afterAll() + } + } + + test("batch - write to kafka") { + val topic = newTopic() + testUtils.createTopic(topic) + val df = Seq("1", "2", "3", "4", "5").map(v => (topic, v)).toDF("topic", "value") + df.write + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("topic", topic) + .save() + checkAnswer( + createKafkaReader(topic).selectExpr("CAST(value as STRING) value"), + Row("1") :: Row("2") :: Row("3") :: Row("4") :: Row("5") :: Nil) + } + + test("batch - null topic field value, and no topic option") { + val df = Seq[(String, String)](null.asInstanceOf[String] -> "1").toDF("topic", "value") + val ex = intercept[SparkException] { + df.write + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .save() + } + assert(ex.getMessage.toLowerCase.contains( + "null topic present in the data")) + } + + test("batch - unsupported save modes") { + val topic = newTopic() + testUtils.createTopic(topic) + val df = Seq[(String, String)](null.asInstanceOf[String] -> "1").toDF("topic", "value") + + // Test bad save mode Ignore + var ex = intercept[AnalysisException] { + df.write + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .mode(SaveMode.Ignore) + .save() + } + assert(ex.getMessage.toLowerCase.contains( + s"save mode ignore not allowed for kafka")) + + // Test bad save mode Overwrite + ex = intercept[AnalysisException] { + df.write + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .mode(SaveMode.Overwrite) + .save() + } + assert(ex.getMessage.toLowerCase.contains( + s"save mode overwrite not allowed for kafka")) + } + + test("streaming - write to kafka with topic field") { + val input = MemoryStream[String] + val topic = newTopic() + testUtils.createTopic(topic) + + val writer = createKafkaWriter( + input.toDF(), + withTopic = None, + withOutputMode = Some(OutputMode.Append))( + withSelectExpr = s"'$topic' as topic", "value") + + val reader = createKafkaReader(topic) + .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value") + .selectExpr("CAST(key as INT) key", "CAST(value as INT) value") + .as[(Int, Int)] + .map(_._2) + + try { + input.addData("1", "2", "3", "4", "5") + failAfter(streamingTimeout) { + writer.processAllAvailable() + } + checkDatasetUnorderly(reader, 1, 2, 3, 4, 5) + input.addData("6", "7", "8", "9", "10") + failAfter(streamingTimeout) { + writer.processAllAvailable() + } + checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + } finally { + writer.stop() + } + } + + test("streaming - write aggregation w/o topic field, with topic option") { + val input = MemoryStream[String] + val topic = newTopic() + testUtils.createTopic(topic) + + val writer = createKafkaWriter( + input.toDF().groupBy("value").count(), + withTopic = Some(topic), + withOutputMode = Some(OutputMode.Update()))( + withSelectExpr = "CAST(value as STRING) key", "CAST(count as STRING) value") + + val reader = createKafkaReader(topic) + .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value") + .selectExpr("CAST(key as INT) key", "CAST(value as INT) value") + .as[(Int, Int)] + + try { + input.addData("1", "2", "2", "3", "3", "3") + failAfter(streamingTimeout) { + writer.processAllAvailable() + } + checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3)) + input.addData("1", "2", "3") + failAfter(streamingTimeout) { + writer.processAllAvailable() + } + checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3), (1, 2), (2, 3), (3, 4)) + } finally { + writer.stop() + } + } + + test("streaming - aggregation with topic field and topic option") { + /* The purpose of this test is to ensure that the topic option + * overrides the topic field. We begin by writing some data that + * includes a topic field and value (e.g., 'foo') along with a topic + * option. Then when we read from the topic specified in the option + * we should see the data i.e., the data was written to the topic + * option, and not to the topic in the data e.g., foo + */ + val input = MemoryStream[String] + val topic = newTopic() + testUtils.createTopic(topic) + + val writer = createKafkaWriter( + input.toDF().groupBy("value").count(), + withTopic = Some(topic), + withOutputMode = Some(OutputMode.Update()))( + withSelectExpr = "'foo' as topic", + "CAST(value as STRING) key", "CAST(count as STRING) value") + + val reader = createKafkaReader(topic) + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .selectExpr("CAST(key AS INT)", "CAST(value AS INT)") + .as[(Int, Int)] + + try { + input.addData("1", "2", "2", "3", "3", "3") + failAfter(streamingTimeout) { + writer.processAllAvailable() + } + checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3)) + input.addData("1", "2", "3") + failAfter(streamingTimeout) { + writer.processAllAvailable() + } + checkDatasetUnorderly(reader, (1, 1), (2, 2), (3, 3), (1, 2), (2, 3), (3, 4)) + } finally { + writer.stop() + } + } + + + test("streaming - write data with bad schema") { + val input = MemoryStream[String] + val topic = newTopic() + testUtils.createTopic(topic) + + /* No topic field or topic option */ + var writer: StreamingQuery = null + var ex: Exception = null + try { + ex = intercept[StreamingQueryException] { + writer = createKafkaWriter(input.toDF())( + withSelectExpr = "value as key", "value" + ) + input.addData("1", "2", "3", "4", "5") + writer.processAllAvailable() + } + } finally { + writer.stop() + } + assert(ex.getMessage + .toLowerCase + .contains("topic option required when no 'topic' attribute is present")) + + try { + /* No value field */ + ex = intercept[StreamingQueryException] { + writer = createKafkaWriter(input.toDF())( + withSelectExpr = s"'$topic' as topic", "value as key" + ) + input.addData("1", "2", "3", "4", "5") + writer.processAllAvailable() + } + } finally { + writer.stop() + } + assert(ex.getMessage.toLowerCase.contains("required attribute 'value' not found")) + } + + test("streaming - write data with valid schema but wrong types") { + val input = MemoryStream[String] + val topic = newTopic() + testUtils.createTopic(topic) + + var writer: StreamingQuery = null + var ex: Exception = null + try { + /* topic field wrong type */ + ex = intercept[StreamingQueryException] { + writer = createKafkaWriter(input.toDF())( + withSelectExpr = s"CAST('1' as INT) as topic", "value" + ) + input.addData("1", "2", "3", "4", "5") + writer.processAllAvailable() + } + } finally { + writer.stop() + } + assert(ex.getMessage.toLowerCase.contains("topic type must be a string")) + + try { + /* value field wrong type */ + ex = intercept[StreamingQueryException] { + writer = createKafkaWriter(input.toDF())( + withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as value" + ) + input.addData("1", "2", "3", "4", "5") + writer.processAllAvailable() + } + } finally { + writer.stop() + } + assert(ex.getMessage.toLowerCase.contains( + "value attribute type must be a string or binarytype")) + + try { + ex = intercept[StreamingQueryException] { + /* key field wrong type */ + writer = createKafkaWriter(input.toDF())( + withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as key", "value" + ) + input.addData("1", "2", "3", "4", "5") + writer.processAllAvailable() + } + } finally { + writer.stop() + } + assert(ex.getMessage.toLowerCase.contains( + "key attribute type must be a string or binarytype")) + } + + test("streaming - write to non-existing topic") { + val input = MemoryStream[String] + val topic = newTopic() + + var writer: StreamingQuery = null + var ex: Exception = null + try { + ex = intercept[StreamingQueryException] { + writer = createKafkaWriter(input.toDF(), withTopic = Some(topic))() + input.addData("1", "2", "3", "4", "5") + writer.processAllAvailable() + } + } finally { + writer.stop() + } + assert(ex.getMessage.toLowerCase.contains("job aborted")) + } + + test("streaming - exception on config serializer") { + val input = MemoryStream[String] + var writer: StreamingQuery = null + var ex: Exception = null + ex = intercept[IllegalArgumentException] { + writer = createKafkaWriter( + input.toDF(), + withOptions = Map("kafka.key.serializer" -> "foo"))() + } + assert(ex.getMessage.toLowerCase.contains( + "kafka option 'key.serializer' is not supported")) + + ex = intercept[IllegalArgumentException] { + writer = createKafkaWriter( + input.toDF(), + withOptions = Map("kafka.value.serializer" -> "foo"))() + } + assert(ex.getMessage.toLowerCase.contains( + "kafka option 'value.serializer' is not supported")) + } + + test("generic - write big data with small producer buffer") { + /* This test ensures that we understand the semantics of Kafka when + * is comes to blocking on a call to send when the send buffer is full. + * This test will configure the smallest possible producer buffer and + * indicate that we should block when it is full. Thus, no exception should + * be thrown in the case of a full buffer. + */ + val topic = newTopic() + testUtils.createTopic(topic, 1) + val options = new java.util.HashMap[String, Object] + options.put("bootstrap.servers", testUtils.brokerAddress) + options.put("buffer.memory", "16384") // min buffer size + options.put("block.on.buffer.full", "true") + options.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName) + options.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName) + val inputSchema = Seq(AttributeReference("value", BinaryType)()) + val data = new Array[Byte](15000) // large value + val writeTask = new KafkaWriteTask(options, inputSchema, Some(topic)) + try { + val fieldTypes: Array[DataType] = Array(BinaryType) + val converter = UnsafeProjection.create(fieldTypes) + val row = new SpecificInternalRow(fieldTypes) + row.update(0, data) + val iter = Seq.fill(1000)(converter.apply(row)).iterator + writeTask.execute(iter) + } finally { + writeTask.close() + } + } + + private val topicId = new AtomicInteger(0) + + private def newTopic(): String = s"topic-${topicId.getAndIncrement()}" + + private def createKafkaReader(topic: String): DataFrame = { + spark.read + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("startingOffsets", "earliest") + .option("endingOffsets", "latest") + .option("subscribe", topic) + .load() + } + + private def createKafkaWriter( + input: DataFrame, + withTopic: Option[String] = None, + withOutputMode: Option[OutputMode] = None, + withOptions: Map[String, String] = Map[String, String]()) + (withSelectExpr: String*): StreamingQuery = { + var stream: DataStreamWriter[Row] = null + withTempDir { checkpointDir => + var df = input.toDF() + if (withSelectExpr.length > 0) { + df = df.selectExpr(withSelectExpr: _*) + } + stream = df.writeStream + .format("kafka") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .queryName("kafkaStream") + withTopic.foreach(stream.option("topic", _)) + withOutputMode.foreach(stream.outputMode(_)) + withOptions.foreach(opt => stream.option(opt._1, opt._2)) + } + stream.start() + } +}